Re: Threads waiting on LocalBufferPool
Posted by
Maciek Próchniak on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Threads-waiting-on-LocalBufferPool-tp6257p6356.html
On 21/04/2016 16:46, Aljoscha Krettek
wrote:
Hi,
I would be very happy about improvements to our RocksDB
performance. What are the RocksDB Java benchmarks that you are
running? In Flink, we also have to serialize/deserialize every
time that we access RocksDB using our TypeSerializer. Maybe
this is causing the slow down.
Hi Aljoscha,
I'm using benchmark from:
https://github.com/facebook/rocksdb/blob/master/java/jdb_bench.sh
My value is pretty simple scala case class - around 12 fields with
Int/Long/String values - I think serialization shouldn't be a big
problem. However I think I'll have to do more comprehensive tests to
be sure I'm comparing apples to apples - hope to find time during
weekend for that :)
thanks,
maciek
By the way, what is the type of value stored in the RocksDB
state. Maybe the TypeSerializer for that value is very slow.
Cheers,
Aljoscha
Well...
I found some time to look at rocksDB performance.
It takes around 0.4ms to lookup value state and 0.12ms to
update - these are means, 95th percentile was > 1ms for
get... When I set additional options:
.setIncreaseParallelism(8)
.setMaxOpenFiles(-1)
.setCompressionType(CompressionType.SNAPPY_COMPRESSION)
I manage to get
0.05ms for update and 0.2ms for get - but still it seems
pretty bad compared to standard rocksdb java benchmarks that
I try on the same machine, as they are:
fillseq : 1.23238 micros/op; 89.8 MB/s;
1000000 ops done; 1 / 1 task(s) finished.
readrandom : 9.25380 micros/op; 12.0 MB/s;
1000000 / 1000000 found; 1 / 1 task(s) finished.
fillrandom : 4.46839 micros/op; 24.8 MB/s;
1000000 ops done; 1 / 1 task(s) finished.
guess I'll have to look at it a bit more...
thanks anyway,
maciek
On 21/04/2016 08:41, Maciek Próchniak wrote:
Hi Ufuk,
thanks for quick reply.
Actually I had a little time to try both things.
1) helped only temporarily - it just took a bit longer to
saturate the pool. After few minutes, periodically all
kafka threads were waiting for bufferPool.
2) This seemed to help. I also reduced checkpoint interval
- on rocks we had 5min, now I tried 30s. .
I attach throughput metrics - the former (around 18) is
with increased heap & buffers, the latter (around 22)
is with FileSystemStateBackend.
My state is few GB large - during the test it reached
around 2-3GB. I must admit I was quite impressed that
checkpointing to HDFS using FileSystem took only about
6-7s (with occasional spikes to 12-13s, which can be seen
on metrcs - didn't check if it was caused by hdfs or sth
else).
Now I looked at logs from 18 and seems like checkpointing
rocksdb took around 2-3minutes:
2016-04-20 17:47:33,439 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Triggering checkpoint 6 @ 1461167253439
2016-04-20 17:49:54,196
[flink-akka.actor.default-dispatcher-147] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
- Completed checkpoint 6 (in 140588 ms)
- however I don't see any threads dumping state in
threadStacks...
I guess I'll have to add some metrics around state
invocations to see where is the problem with rocksDB...
I'll write if I find anything, but that won't be today I
think...
Btw - I was looking at FS state and I wonder would it be
feasible to make variant of this state using immutable map
(probably some scala one) to be able to do async
checkpoints.
Then synchronous part would be essentially free - just
taking the state map and materializing it asynchronously.
Of course, that would work only for immutable state - but
this is often the case when writing in scala. WDYT?
thanks,
maciek
On 20/04/2016 16:28, Ufuk Celebi wrote:
Could be different things
actually, including the parts of the network
you mentioned.
1)
Regarding the TM config:
- It can help to increase the number of network buffers
(you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than
you actually
give to Flink. I would increase the 20 GB heap size.
As a first step you could address these two points and
re-run your job.
2)
As a follow-up you could also work with the
FileSystemStateBackend,
which keeps state in memory (on-heap) and writes
checkpoints to files.
This would help in checking how much RocksDB is slowing
things down.
I'm curious about the results. Do you think you will
have time to try this?
– Ufuk
On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak [hidden email] wrote:
Hi,
I'm running my flink job on one rather large machine
(20 cores with
hyperthreading, 120GB RAM). Task manager has 20GB heap
allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields
-> some custom state
processing.
Kafka topic has 24 partitions, so my parallelism is
also 24
After some tweaks and upgrading to 1.0.2-rc3 (as I use
RocksDB state
backend) I reached a point when throughput is
~120-150k/s.
One the same kafka and machine I reached > 500k/s
with simple filtering job,
so I wanted to see what's the bottleneck.
It turns out that quite often all of kafka threads are
stuck waiting for
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0
tid=0x00007f77fd80d000
nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
java.lang.Thread.State: TIMED_WAITING (on object
monitor)
at java.lang.Object.wait(Native Method)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- locked <0x00000002eade3890> (a
java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x00000002eb73cbd0> (a
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at
scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
- locked <0x00000002eaf3eb50> (a
java.lang.Object)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
- locked <0x00000002eaf3eb50> (a
java.lang.Object)
This seems a bit weird for me, as most of state
processing threads are idle:
"My custom function -> (Sink: Unnamed, Map)
(19/24)" #7353 daemon prio=5
os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on
condition
[0x00007f7bee8ed000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for
<0x00000002eb840c38> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
I tried with using more network buffers, but I doesn't
seem to change
anything - and if I understand correctly
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
I should not need more than 24^2 * 4 of them...
Does anybody encountered such problem? Or maybe it's
just normal for such
case...
thanks,
maciek