RocksDB segfaults

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

RocksDB segfaults

Florian König
Hi,

I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?

The first thing that came to my mind was the stack traces (see below). Anything else?

Thanks
Florian

----

Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
v  ~StubRoutines::call_stub

--------------------------------------

Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
j  org.apache.flink.runtime.taskmanager.Task.run()V+700
j  java.lang.Thread.run()V+11
v  ~StubRoutines::call_stub
V  [libjvm.so+0x68c616]
V  [libjvm.so+0x68cb21]
V  [libjvm.so+0x68cfc7]
V  [libjvm.so+0x723d80]
V  [libjvm.so+0xa69dcf]
V  [libjvm.so+0xa69efc]
V  [libjvm.so+0x91d9d8]
C  [libpthread.so.0+0x8064]  start_thread+0xc4

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
j  org.apache.flink.runtime.taskmanager.Task.run()V+700
j  java.lang.Thread.run()V+11
v  ~StubRoutines::call_stub



Reply | Threaded
Open this post in threaded view
|

Re: RocksDB segfaults

Stephan Ewen
Hi!

It looks like you are running the RocksDB state backend 1.1 (is still an old version packaged into your JAR file?)

This line indicates that: org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in 1.2 any more)

Can you try and run 1.2 and see if that still occurs? In general, I cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.

Stephan



On Wed, Mar 22, 2017 at 3:13 PM, Florian König <[hidden email]> wrote:
Hi,

I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?

The first thing that came to my mind was the stack traces (see below). Anything else?

Thanks
Florian

----

Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
v  ~StubRoutines::call_stub

--------------------------------------

Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
j  org.apache.flink.runtime.taskmanager.Task.run()V+700
j  java.lang.Thread.run()V+11
v  ~StubRoutines::call_stub
V  [libjvm.so+0x68c616]
V  [libjvm.so+0x68cb21]
V  [libjvm.so+0x68cfc7]
V  [libjvm.so+0x723d80]
V  [libjvm.so+0xa69dcf]
V  [libjvm.so+0xa69efc]
V  [libjvm.so+0x91d9d8]
C  [libpthread.so.0+0x8064]  start_thread+0xc4

Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
j  org.apache.flink.runtime.taskmanager.Task.run()V+700
j  java.lang.Thread.run()V+11
v  ~StubRoutines::call_stub




Reply | Threaded
Open this post in threaded view
|

Re: RocksDB segfaults

Florian König
Hi Stephen,

you are right, the second stack trace is indeed from a run of Flink 1.1.4. Sorry, my bad.

That leaves us with the first trace of a segfault for which I can guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the problem. It has happened twice so far, but I can’t see any pattern. Is there anything in the stack trace that could point us to a probable cause?

Florian

> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <[hidden email]>:
>
> Hi!
>
> It looks like you are running the RocksDB state backend 1.1 (is still an old version packaged into your JAR file?)
>
> This line indicates that: org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in 1.2 any more)
>
> Can you try and run 1.2 and see if that still occurs? In general, I cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>
> Stephan
>
>
>
> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <[hidden email]> wrote:
> Hi,
>
> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?
>
> The first thing that came to my mind was the stack traces (see below). Anything else?
>
> Thanks
> Florian
>
> ----
>
> Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
> J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
> J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
> v  ~StubRoutines::call_stub
>
> --------------------------------------
>
> Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
> C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
> j  java.lang.Thread.run()V+11
> v  ~StubRoutines::call_stub
> V  [libjvm.so+0x68c616]
> V  [libjvm.so+0x68cb21]
> V  [libjvm.so+0x68cfc7]
> V  [libjvm.so+0x723d80]
> V  [libjvm.so+0xa69dcf]
> V  [libjvm.so+0xa69efc]
> V  [libjvm.so+0x91d9d8]
> C  [libpthread.so.0+0x8064]  start_thread+0xc4
>
> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
> j  java.lang.Thread.run()V+11
> v  ~StubRoutines::call_stub
>
>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB segfaults

Stefan Richter
Hi,

for the first checkpoint, from the stacktrace I assume that the backend is not accessed as part of processing an element, but by another thread. Is that correct? RocksDB requires accessing threads to hold the task’s checkpointing lock, otherwise they might call methods on an instance that is already disposed. However, this should only happen when the task was already about to shutdown anyways. Is that a plausible explanation for your observed behaviour? I can also not rule out that segfaults can happen inside RocksDB or due to the JNI bridge.

Best,
Stefan

> Am 22.03.2017 um 16:53 schrieb Florian König <[hidden email]>:
>
> Hi Stephen,
>
> you are right, the second stack trace is indeed from a run of Flink 1.1.4. Sorry, my bad.
>
> That leaves us with the first trace of a segfault for which I can guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the problem. It has happened twice so far, but I can’t see any pattern. Is there anything in the stack trace that could point us to a probable cause?
>
> Florian
>
>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <[hidden email]>:
>>
>> Hi!
>>
>> It looks like you are running the RocksDB state backend 1.1 (is still an old version packaged into your JAR file?)
>>
>> This line indicates that: org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in 1.2 any more)
>>
>> Can you try and run 1.2 and see if that still occurs? In general, I cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>>
>> Stephan
>>
>>
>>
>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <[hidden email]> wrote:
>> Hi,
>>
>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?
>>
>> The first thing that came to my mind was the stack traces (see below). Anything else?
>>
>> Thanks
>> Florian
>>
>> ----
>>
>> Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
>> J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
>> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
>> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
>> J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
>> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
>> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
>> v  ~StubRoutines::call_stub
>>
>> --------------------------------------
>>
>> Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
>> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>> j  java.lang.Thread.run()V+11
>> v  ~StubRoutines::call_stub
>> V  [libjvm.so+0x68c616]
>> V  [libjvm.so+0x68cb21]
>> V  [libjvm.so+0x68cfc7]
>> V  [libjvm.so+0x723d80]
>> V  [libjvm.so+0xa69dcf]
>> V  [libjvm.so+0xa69efc]
>> V  [libjvm.so+0x91d9d8]
>> C  [libpthread.so.0+0x8064]  start_thread+0xc4
>>
>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>> j  java.lang.Thread.run()V+11
>> v  ~StubRoutines::call_stub
>>
>>
>>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: RocksDB segfaults

rmetzger0
Florian, can you post the log of the Taskmanager where the segfault happened ?

On Wed, Mar 22, 2017 at 6:19 PM, Stefan Richter <[hidden email]> wrote:
Hi,

for the first checkpoint, from the stacktrace I assume that the backend is not accessed as part of processing an element, but by another thread. Is that correct? RocksDB requires accessing threads to hold the task’s checkpointing lock, otherwise they might call methods on an instance that is already disposed. However, this should only happen when the task was already about to shutdown anyways. Is that a plausible explanation for your observed behaviour? I can also not rule out that segfaults can happen inside RocksDB or due to the JNI bridge.

Best,
Stefan

> Am 22.03.2017 um 16:53 schrieb Florian König <[hidden email]>:
>
> Hi Stephen,
>
> you are right, the second stack trace is indeed from a run of Flink 1.1.4. Sorry, my bad.
>
> That leaves us with the first trace of a segfault for which I can guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the problem. It has happened twice so far, but I can’t see any pattern. Is there anything in the stack trace that could point us to a probable cause?
>
> Florian
>
>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <[hidden email]>:
>>
>> Hi!
>>
>> It looks like you are running the RocksDB state backend 1.1 (is still an old version packaged into your JAR file?)
>>
>> This line indicates that: org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in 1.2 any more)
>>
>> Can you try and run 1.2 and see if that still occurs? In general, I cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>>
>> Stephan
>>
>>
>>
>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <[hidden email]> wrote:
>> Hi,
>>
>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?
>>
>> The first thing that came to my mind was the stack traces (see below). Anything else?
>>
>> Thanks
>> Florian
>>
>> ----
>>
>> Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
>> J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
>> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
>> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
>> J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
>> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
>> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
>> v  ~StubRoutines::call_stub
>>
>> --------------------------------------
>>
>> Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
>> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>> j  java.lang.Thread.run()V+11
>> v  ~StubRoutines::call_stub
>> V  [libjvm.so+0x68c616]
>> V  [libjvm.so+0x68cb21]
>> V  [libjvm.so+0x68cfc7]
>> V  [libjvm.so+0x723d80]
>> V  [libjvm.so+0xa69dcf]
>> V  [libjvm.so+0xa69efc]
>> V  [libjvm.so+0x91d9d8]
>> C  [libpthread.so.0+0x8064]  start_thread+0xc4
>>
>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>> j  java.lang.Thread.run()V+11
>> v  ~StubRoutines::call_stub
>>
>>
>>
>>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB segfaults

Florian König
In reply to this post by Stefan Richter
Hi,

@Robert: I have uploaded all the log files that I could get my hands on to https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I tried to remove all unrelated messages logged by the job itself. In flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and the last half hour before the segfault.

@Stefan: Your theory could be the key. In the stack trace I see a call to org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge that results in a call to com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long later. Here’s a slimmed-down snippet of the relevant code:

class TransitionProcessor extends RichFlatMapFunction<Transition, Reaction> {

        transient ValueState<IdSet> headSeenState;
       
        public void open(final Configuration parameters) throws Exception {
                headSeenState = FlinkUtil.getStateHandle(this, "head-seen", IdSet.class);
       
                getRuntimeContext()
                        .getMetricGroup()
                        .gauge("head-seen", new Gauge<Long>() {
                                public Long getValue() {
                                        try {
                                                return headSeenState.value().count();
                                        } catch (IOException e) {
                                                e.printStackTrace();
                                                return 0L;
                                        }
                                }
                        });
        }

}

FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a reference to that state via the RuntimeContext of the RichFunction passed as ‚this‘ in the above code.

Further along in the stack trace I see that headSeenState.value() results in a call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() and then to org.rocksdb.RocksDB.get(J[BIJ).

It looks like part of the metrics system asynchronously reads the value of the gauge and needs RocksDB for that. Is it possible that this thread does not hold the checkpointing lock you were talking about?

Best regards
Florian


> Am 22.03.2017 um 18:19 schrieb Stefan Richter <[hidden email]>:
>
> Hi,
>
> for the first checkpoint, from the stacktrace I assume that the backend is not accessed as part of processing an element, but by another thread. Is that correct? RocksDB requires accessing threads to hold the task’s checkpointing lock, otherwise they might call methods on an instance that is already disposed. However, this should only happen when the task was already about to shutdown anyways. Is that a plausible explanation for your observed behaviour? I can also not rule out that segfaults can happen inside RocksDB or due to the JNI bridge.
>
> Best,
> Stefan
>
>> Am 22.03.2017 um 16:53 schrieb Florian König <[hidden email]>:
>>
>> Hi Stephen,
>>
>> you are right, the second stack trace is indeed from a run of Flink 1.1.4. Sorry, my bad.
>>
>> That leaves us with the first trace of a segfault for which I can guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the problem. It has happened twice so far, but I can’t see any pattern. Is there anything in the stack trace that could point us to a probable cause?
>>
>> Florian
>>
>>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <[hidden email]>:
>>>
>>> Hi!
>>>
>>> It looks like you are running the RocksDB state backend 1.1 (is still an old version packaged into your JAR file?)
>>>
>>> This line indicates that: org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in 1.2 any more)
>>>
>>> Can you try and run 1.2 and see if that still occurs? In general, I cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>>>
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <[hidden email]> wrote:
>>> Hi,
>>>
>>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?
>>>
>>> The first thing that came to my mind was the stack traces (see below). Anything else?
>>>
>>> Thanks
>>> Florian
>>>
>>> ----
>>>
>>> Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
>>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
>>> J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
>>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
>>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
>>> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
>>> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
>>> J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
>>> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
>>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
>>> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
>>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
>>> v  ~StubRoutines::call_stub
>>>
>>> --------------------------------------
>>>
>>> Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
>>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
>>> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
>>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
>>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
>>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
>>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>>> j  java.lang.Thread.run()V+11
>>> v  ~StubRoutines::call_stub
>>> V  [libjvm.so+0x68c616]
>>> V  [libjvm.so+0x68cb21]
>>> V  [libjvm.so+0x68cfc7]
>>> V  [libjvm.so+0x723d80]
>>> V  [libjvm.so+0xa69dcf]
>>> V  [libjvm.so+0xa69efc]
>>> V  [libjvm.so+0x91d9d8]
>>> C  [libpthread.so.0+0x8064]  start_thread+0xc4
>>>
>>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
>>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>>> j  java.lang.Thread.run()V+11
>>> v  ~StubRoutines::call_stub
>>>
>>>
>>>
>>>
>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: RocksDB segfaults

Stephan Ewen
The code will not work properly, sorry.

The value returned by the state is whatever is stored under the key for which the function was called the last time.
In addition, the unsynchronized access is most likely causing the RocksDB fault.

TL:DR - The "ValueState" / "ListState" / etc in Flink are not intended to be used across threads.



On Fri, Mar 24, 2017 at 8:28 AM, Florian König <[hidden email]> wrote:
Hi,

@Robert: I have uploaded all the log files that I could get my hands on to https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I tried to remove all unrelated messages logged by the job itself. In flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and the last half hour before the segfault.

@Stefan: Your theory could be the key. In the stack trace I see a call to org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge that results in a call to com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long later. Here’s a slimmed-down snippet of the relevant code:

class TransitionProcessor extends RichFlatMapFunction<Transition, Reaction> {

        transient ValueState<IdSet> headSeenState;

        public void open(final Configuration parameters) throws Exception {
                headSeenState = FlinkUtil.getStateHandle(this, "head-seen", IdSet.class);

                getRuntimeContext()
                        .getMetricGroup()
                        .gauge("head-seen", new Gauge<Long>() {
                                public Long getValue() {
                                        try {
                                                return headSeenState.value().count();
                                        } catch (IOException e) {
                                                e.printStackTrace();
                                                return 0L;
                                        }
                                }
                        });
        }

}

FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a reference to that state via the RuntimeContext of the RichFunction passed as ‚this‘ in the above code.

Further along in the stack trace I see that headSeenState.value() results in a call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() and then to org.rocksdb.RocksDB.get(J[BIJ).

It looks like part of the metrics system asynchronously reads the value of the gauge and needs RocksDB for that. Is it possible that this thread does not hold the checkpointing lock you were talking about?

Best regards
Florian


> Am 22.03.2017 um 18:19 schrieb Stefan Richter <[hidden email]>:
>
> Hi,
>
> for the first checkpoint, from the stacktrace I assume that the backend is not accessed as part of processing an element, but by another thread. Is that correct? RocksDB requires accessing threads to hold the task’s checkpointing lock, otherwise they might call methods on an instance that is already disposed. However, this should only happen when the task was already about to shutdown anyways. Is that a plausible explanation for your observed behaviour? I can also not rule out that segfaults can happen inside RocksDB or due to the JNI bridge.
>
> Best,
> Stefan
>
>> Am 22.03.2017 um 16:53 schrieb Florian König <[hidden email]>:
>>
>> Hi Stephen,
>>
>> you are right, the second stack trace is indeed from a run of Flink 1.1.4. Sorry, my bad.
>>
>> That leaves us with the first trace of a segfault for which I can guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the problem. It has happened twice so far, but I can’t see any pattern. Is there anything in the stack trace that could point us to a probable cause?
>>
>> Florian
>>
>>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <[hidden email]>:
>>>
>>> Hi!
>>>
>>> It looks like you are running the RocksDB state backend 1.1 (is still an old version packaged into your JAR file?)
>>>
>>> This line indicates that: org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in 1.2 any more)
>>>
>>> Can you try and run 1.2 and see if that still occurs? In general, I cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
>>>
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <[hidden email]> wrote:
>>> Hi,
>>>
>>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?
>>>
>>> The first thing that came to my mind was the stack traces (see below). Anything else?
>>>
>>> Thanks
>>> Florian
>>>
>>> ----
>>>
>>> Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
>>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
>>> J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
>>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
>>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
>>> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
>>> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
>>> J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
>>> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
>>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
>>> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
>>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
>>> v  ~StubRoutines::call_stub
>>>
>>> --------------------------------------
>>>
>>> Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
>>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
>>> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
>>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
>>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
>>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
>>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>>> j  java.lang.Thread.run()V+11
>>> v  ~StubRoutines::call_stub
>>> V  [libjvm.so+0x68c616]
>>> V  [libjvm.so+0x68cb21]
>>> V  [libjvm.so+0x68cfc7]
>>> V  [libjvm.so+0x723d80]
>>> V  [libjvm.so+0xa69dcf]
>>> V  [libjvm.so+0xa69efc]
>>> V  [libjvm.so+0x91d9d8]
>>> C  [libpthread.so.0+0x8064]  start_thread+0xc4
>>>
>>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
>>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
>>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
>>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
>>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
>>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
>>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
>>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
>>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
>>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
>>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
>>> j  java.lang.Thread.run()V+11
>>> v  ~StubRoutines::call_stub
>>>
>>>
>>>
>>>
>>
>>
>



Reply | Threaded
Open this post in threaded view
|

Re: RocksDB segfaults

Florian König
Thank you Stephan for spotting the problem. In hindsight it’s obvious that this can never work. I’ll figure something out :)

> Am 24.03.2017 um 10:28 schrieb Stephan Ewen <[hidden email]>:
>
> The code will not work properly, sorry.
>
> The value returned by the state is whatever is stored under the key for which the function was called the last time.
> In addition, the unsynchronized access is most likely causing the RocksDB fault.
>
> TL:DR - The "ValueState" / "ListState" / etc in Flink are not intended to be used across threads.
>
>
>
> On Fri, Mar 24, 2017 at 8:28 AM, Florian König <[hidden email]> wrote:
> Hi,
>
> @Robert: I have uploaded all the log files that I could get my hands on to https://www.dropbox.com/sh/l35q6979hy7mue7/AAAe1gABW59eQt6jGxA3pAYaa?dl=0. I tried to remove all unrelated messages logged by the job itself. In flink-root-jobmanager-0-micardo-dev.log I kept the Flink startup messages and the last half hour before the segfault.
>
> @Stefan: Your theory could be the key. In the stack trace I see a call to org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge that results in a call to com.micardo.backend.tail.TransitionProcessor$2.getValue()Ljava/lang/Long later. Here’s a slimmed-down snippet of the relevant code:
>
> class TransitionProcessor extends RichFlatMapFunction<Transition, Reaction> {
>
>         transient ValueState<IdSet> headSeenState;
>
>         public void open(final Configuration parameters) throws Exception {
>                 headSeenState = FlinkUtil.getStateHandle(this, "head-seen", IdSet.class);
>
>                 getRuntimeContext()
>                         .getMetricGroup()
>                         .gauge("head-seen", new Gauge<Long>() {
>                                 public Long getValue() {
>                                         try {
>                                                 return headSeenState.value().count();
>                                         } catch (IOException e) {
>                                                 e.printStackTrace();
>                                                 return 0L;
>                                         }
>                                 }
>                         });
>         }
> …
> }
>
> FlinkUtil.getStateHandle instantiates a ValueStateDescriptor and acquires a reference to that state via the RuntimeContext of the RichFunction passed as ‚this‘ in the above code.
>
> Further along in the stack trace I see that headSeenState.value() results in a call to org.apache.flink.contrib.streaming.state.RocksDBValueState.value() and then to org.rocksdb.RocksDB.get(J[BIJ).
>
> It looks like part of the metrics system asynchronously reads the value of the gauge and needs RocksDB for that. Is it possible that this thread does not hold the checkpointing lock you were talking about?
>
> Best regards
> Florian
>
>
> > Am 22.03.2017 um 18:19 schrieb Stefan Richter <[hidden email]>:
> >
> > Hi,
> >
> > for the first checkpoint, from the stacktrace I assume that the backend is not accessed as part of processing an element, but by another thread. Is that correct? RocksDB requires accessing threads to hold the task’s checkpointing lock, otherwise they might call methods on an instance that is already disposed. However, this should only happen when the task was already about to shutdown anyways. Is that a plausible explanation for your observed behaviour? I can also not rule out that segfaults can happen inside RocksDB or due to the JNI bridge.
> >
> > Best,
> > Stefan
> >
> >> Am 22.03.2017 um 16:53 schrieb Florian König <[hidden email]>:
> >>
> >> Hi Stephen,
> >>
> >> you are right, the second stack trace is indeed from a run of Flink 1.1.4. Sorry, my bad.
> >>
> >> That leaves us with the first trace of a segfault for which I can guarantee that it brought down a 1.2.0 instance. Unfortunately I cannot reproduce the problem. It has happened twice so far, but I can’t see any pattern. Is there anything in the stack trace that could point us to a probable cause?
> >>
> >> Florian
> >>
> >>> Am 22.03.2017 um 16:00 schrieb Stephan Ewen <[hidden email]>:
> >>>
> >>> Hi!
> >>>
> >>> It looks like you are running the RocksDB state backend 1.1 (is still an old version packaged into your JAR file?)
> >>>
> >>> This line indicates that: org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot (the method does not exist in 1.2 any more)
> >>>
> >>> Can you try and run 1.2 and see if that still occurs? In general, I cannot vouch 100% for RocksDBs JNI API, but in our 1.2 tests so far it was stable.
> >>>
> >>> Stephan
> >>>
> >>>
> >>>
> >>> On Wed, Mar 22, 2017 at 3:13 PM, Florian König <[hidden email]> wrote:
> >>> Hi,
> >>>
> >>> I have experienced two crashes of Flink 1.2 by SIGSEGV, caused by something in RocksDB. What is the preferred way to report them? All I got at the moment are two hs_err_pid12345.log files. They are over 4000 lines long each. Is there anything significant that I should extract to help you guys and/or put into a JIRA ticket?
> >>>
> >>> The first thing that came to my mind was the stack traces (see below). Anything else?
> >>>
> >>> Thanks
> >>> Florian
> >>>
> >>> ----
> >>>
> >>> Stack: [0x00007fec04341000,0x00007fec04442000],  sp=0x00007fec0443ff48,  free space=1019k
> >>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> >>> J 10252  org.rocksdb.RocksDB.get(J[BIJ)[B (0 bytes) @ 0x00007fec925887cc [0x00007fec92588780+0x4c]
> >>> J 27241 C2 org.apache.flink.contrib.streaming.state.RocksDBValueState.value()Ljava/lang/Object; (78 bytes) @ 0x00007fec94010ca4 [0x00007fec940109c0+0x2e4]
> >>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Long;+7
> >>> j  com.micardo.backend.TransitionProcessor$2.getValue()Ljava/lang/Object;+1
> >>> J 38483 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(Ljava/io/DataOutput;Lorg/apache/flink/runtime/metrics/dump/QueryScopeInfo;Ljava/lang/String;Lorg/apache/flink/metrics/Gauge;)V (114 bytes) @ 0x00007fec918eabf0 [0x00007fec918eabc0+0x30]
> >>> J 38522 C2 org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;Ljava/util/Map;)Lorg/apache/flink/runtime/metrics/dump/MetricDumpSerialization$MetricSerializationResult; (471 bytes) @ 0x00007fec94eb6260 [0x00007fec94eb57a0+0xac0]
> >>> J 47531 C2 org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(Ljava/lang/Object;)V (453 bytes) @ 0x00007fec95ca57a0 [0x00007fec95ca4da0+0xa00]
> >>> J 5815 C2 akka.actor.UntypedActor.aroundReceive(Lscala/PartialFunction;Ljava/lang/Object;)V (7 bytes) @ 0x00007fec91e3ae6c [0x00007fec91e3adc0+0xac]
> >>> J 5410 C2 akka.actor.ActorCell.invoke(Lakka/dispatch/Envelope;)V (104 bytes) @ 0x00007fec91d5bc44 [0x00007fec91d5b9a0+0x2a4]
> >>> J 6628 C2 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(Lscala/concurrent/forkjoin/ForkJoinTask;)V (60 bytes) @ 0x00007fec9212d050 [0x00007fec9212ccc0+0x390]
> >>> J 40130 C2 scala.concurrent.forkjoin.ForkJoinWorkerThread.run()V (182 bytes) @ 0x00007fec923f8170 [0x00007fec923f7fc0+0x1b0]
> >>> v  ~StubRoutines::call_stub
> >>>
> >>> --------------------------------------
> >>>
> >>> Stack: [0x00007f167a5b7000,0x00007f167a6b8000],  sp=0x00007f167a6b5f40,  free space=1019k
> >>> Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native code)
> >>> C  [libstdc++.so.6+0xc026b]  std::basic_string<char, std::char_traits<char>, std::allocator<char> >::basic_string(std::string const&)+0xb
> >>> C  [librocksdbjni8426686507832168508.so+0x2f14ca]  rocksdb::BackupEngine::Open(rocksdb::Env*, rocksdb::BackupableDBOptions const&, rocksdb::BackupEngine**)+0x3a
> >>> C  [librocksdbjni8426686507832168508.so+0x180ad5]  Java_org_rocksdb_BackupEngine_open+0x25
> >>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79aa36 [0x00007f16cb79a980+0xb6]
> >>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
> >>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
> >>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
> >>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
> >>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
> >>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
> >>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
> >>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
> >>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
> >>> j  java.lang.Thread.run()V+11
> >>> v  ~StubRoutines::call_stub
> >>> V  [libjvm.so+0x68c616]
> >>> V  [libjvm.so+0x68cb21]
> >>> V  [libjvm.so+0x68cfc7]
> >>> V  [libjvm.so+0x723d80]
> >>> V  [libjvm.so+0xa69dcf]
> >>> V  [libjvm.so+0xa69efc]
> >>> V  [libjvm.so+0x91d9d8]
> >>> C  [libpthread.so.0+0x8064]  start_thread+0xc4
> >>>
> >>> Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)
> >>> J 50030  org.rocksdb.BackupEngine.open(JJ)J (0 bytes) @ 0x00007f16cb79a9c4 [0x00007f16cb79a980+0x44]
> >>> J 49809 C1 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.performSemiAsyncSnapshot(JJ)Ljava/util/HashMap; (416 bytes) @ 0x00007f16cd2733d4 [0x00007f16cd2719a0+0x1a34]
> >>> J 51766 C2 org.apache.flink.contrib.streaming.state.RocksDBStateBackend.snapshotPartitionedState(JJ)Ljava/util/HashMap; (40 bytes) @ 0x00007f16cb40a1fc [0x00007f16cb40a1a0+0x5c]
> >>> J 50547 C2 org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(JJ)Lorg/apache/flink/streaming/runtime/tasks/StreamTaskState; (206 bytes) @ 0x00007f16cb8be89c [0x00007f16cb8be7e0+0xbc]
> >>> J 52232 C2 org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(JJ)Z (650 bytes) @ 0x00007f16cbfbbf60 [0x00007f16cbfbb540+0xa20]
> >>> J 52419 C2 org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(Lorg/apache/flink/runtime/io/network/api/CheckpointBarrier;)V (25 bytes) @ 0x00007f16cbdd2624 [0x00007f16cbdd25c0+0x64]
> >>> J 41649 C2 org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(Lorg/apache/flink/streaming/api/operators/OneInputStreamOperator;Ljava/lang/Object;)Z (439 bytes) @ 0x00007f16cc8aed5c [0x00007f16cc8add40+0x101c]
> >>> J 33374% C2 org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run()V (42 bytes) @ 0x00007f16cbdc21d0 [0x00007f16cbdc20c0+0x110]
> >>> j  org.apache.flink.streaming.runtime.tasks.StreamTask.invoke()V+301
> >>> j  org.apache.flink.runtime.taskmanager.Task.run()V+700
> >>> j  java.lang.Thread.run()V+11
> >>> v  ~StubRoutines::call_stub
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >
>
>
>