Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous
snapshots fail when using the Filesystem back-end. Synchronous snapshots succeed, and RocksDB snapshots succeed (both async and sync), but async Filesystem snapshots fail with this error: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.set(ArrayList.java:448) at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) at org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentAssignments.scala:102) at net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:218) at net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:76) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:86) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:270) at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748) This stack trace occurs when I am trying to access the value of a `ValueState[scala.collection.mutable.PriorityQueue[(AJavaObjectThatUsesTwitterChillProtoSerialization, Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on. I have found similar errors occurring in already-fixed tickets like this one: https://issues.apache.org/jira/browse/FLINK-7484 which is part of this umbrella issue: https://issues.apache.org/jira/browse/FLINK-7830 However these tickets are apparently resolved, maybe the bug has not been completely fixed? Or maybe I am making a mistake in programming? When I get the value of the state, I do mutate it, and I also mutate the mutable.BitSet before persisting again. But as far as I know this is perfectly ok by flink yes? Thanks for any help or pointers! -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Thanks for reporting this, also thanks for checking out that this works with RocksDB and also with synchronous checkpoints.
I would assume that this issue lies not in the serializer itself, but in accidental sharing in the FsStateBackend async snapshots. Do you know if the issue still exists in Flink 1.4.2? On Tue, Apr 17, 2018 at 6:14 PM, joshlemer <[hidden email]> wrote: Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous |
In reply to this post by joshlemer
Hi,
I agree that this looks like a serializer is shared between two threads, one of them being the event processing loop. I am doubting that the problem is with the async fs backend, because there is code in place that will duplicate all serializers for the async snapshot thread and this is also checked by a test. Furthermore, the code of this backend was not changed in ways that could affect this behaviour since many month and there have not been similar reports. The fact that it works for async snapshots with RocksDB does not have any implcations for the fs backend, because the RocksDB backend does not use any serializers to write snapshots. Mutating the bitset objects should also not be a problem, this would not cause such an exception in a serializer. I find it more likely that there is still some problem with the duplication of stateful serializers, especially since your serializer looks like a more complex, nested structure. I have quickly checked through TraversableSerializer, CaseClassSerializer, and KryoSerializer but could not spot any obvious errors in their deep copy code. Except there might be a (known and discussed) case that registering stateful default serializers for Kryo can cause problems. Are you registering default serializers for Kryo? As a further step to debug this problem, could you run this job with debug logging on the flink 1.5 RC? This would show if the problem is still present and flink 1.5 would also generate better logs to debug this problem. Best, Stefan > Am 17.04.2018 um 18:14 schrieb joshlemer <[hidden email]>: > > Hello all, I am running Flink 1.4.0 on Amazon EMR, and find that asynchronous > snapshots fail when using the Filesystem back-end. Synchronous snapshots > succeed, and RocksDB snapshots succeed (both async and sync), but async > Filesystem snapshots fail with this error: > > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > at java.util.ArrayList.set(ArrayList.java:448) > at > com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:56) > at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:875) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:710) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69) > at > org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:282) > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:306) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) > at > net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.enqueueSegmentAssignment(JoinSegmentMappingWithSegmentAssignments.scala:102) > at > net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:218) > at > net.districtm.segmentsync.processing.JoinSegmentMappingWithSegmentAssignments.processElement2(JoinSegmentMappingWithSegmentAssignments.scala:76) > at > org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement2(KeyedCoProcessOperator.java:86) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:270) > at > org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:748) > > This stack trace occurs when I am trying to access the value of a > > `ValueState[scala.collection.mutable.PriorityQueue[(AJavaObjectThatUsesTwitterChillProtoSerialization, > Long, scala.collection.mutable.BitSet)]` while a checkpoint is going on. > > I have found similar errors occurring in already-fixed tickets like this > one: > https://issues.apache.org/jira/browse/FLINK-7484 > which is part of this umbrella issue: > https://issues.apache.org/jira/browse/FLINK-7830 > > However these tickets are apparently resolved, maybe the bug has not been > completely fixed? Or maybe I am making a mistake in programming? When I get > the value of the state, I do mutate it, and I also mutate the mutable.BitSet > before persisting again. But as far as I know this is perfectly ok by flink > yes? > > Thanks for any help or pointers! > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Free forum by Nabble | Edit this page |