Blocked requesting MemorySegment when Segments are available.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Blocked requesting MemorySegment when Segments are available.

David Maddison
Hi,

I keep seeing the following situation where a task is blocked getting a MemorySegment from the pool but the TaskManager is reporting that it has lots of MemorySegments available.  

I'm completely stumped as to how to debug or what to look at next so any hints/help/advice would be greatly appreciated!

/David/

The situation is as follows (Flink 1.10.0):

I have two operations, the first one "enrich-events" is stuck forever attempting to get a memory segment to send to downstream operator "Test function":

"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0 tid=0x00007f6424091800 nid=0x13b waiting on condition [0x00007f644acf0000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000d2206000> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)    

All the operator tasks are running on the same TaskManager and the TaskManager reports that it has 6,517 memory segments available, so it's confusing why the task would be blocked getting a memory segment.

Memory Segments
Type      Count
Available      6,517
Total      6,553

Even more confusing is that the downstream task appears to be waiting for data and therefore I would assume that the credit based flow control isn't causing the back pressure.

"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x00007f6424094000 nid=0x13c waiting on condition [0x00007f644abf0000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000c91de1d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Reply | Threaded
Open this post in threaded view
|

Re: Blocked requesting MemorySegment when Segments are available.

Zhijiang(wangzhijiang999)
Hi David,

I want to clarify two things firstly based on the info you provided below.

1. If all the tasks are running on the same TaskManager, it would be no credit-based flow control. The downstream operator consumes the upstream's data in memory directly, no need network shuffle.
2. If the TaskManager has available buffers, that does not mean the internal task must have available buffers on input or output sides. E.g for the output side of "enrich-events" operator, it has
10 buffers in maximum. After these buffers are exhausted the operator would be blocked no matter with available buffers on TaskManager level.

Considering your case, could you double check whether there are buffers accumulated in output ("outputQueueLength" metric) of "enrich-events" operator and whether the "numRecordsIn/numBytesIn" metric of "Test Function" operator is more than 0? I want to get ride of the factors of buffer leak on upstream side and without partition request on downstream side. Then we can further allocate whether
the input availability notification on downstream side has bugs to make it stuck forever.

Best,
Zhijiang

------------------------------------------------------------------
From:David Maddison <[hidden email]>
Send Time:2020年6月9日(星期二) 19:28
To:user <[hidden email]>
Subject:Blocked requesting MemorySegment when Segments are available.

Hi,

I keep seeing the following situation where a task is blocked getting a MemorySegment from the pool but the TaskManager is reporting that it has lots of MemorySegments available.  

I'm completely stumped as to how to debug or what to look at next so any hints/help/advice would be greatly appreciated!

/David/

The situation is as follows (Flink 1.10.0):

I have two operations, the first one "enrich-events" is stuck forever attempting to get a memory segment to send to downstream operator "Test function":

"read-events -> enriched-events (1/1)" #61 prio=5 os_prio=0 tid=0x00007f6424091800 nid=0x13b waiting on condition [0x00007f644acf0000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000d2206000> (a java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:231)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:209)
at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:145)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:116)
at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)    

All the operator tasks are running on the same TaskManager and the TaskManager reports that it has 6,517 memory segments available, so it's confusing why the task would be blocked getting a memory segment.

Memory Segments
Type      Count
Available      6,517
Total      6,553

Even more confusing is that the downstream task appears to be waiting for data and therefore I would assume that the credit based flow control isn't causing the back pressure.

"Test Function (1/1)" #62 prio=5 os_prio=0 tid=0x00007f6424094000 nid=0x13c waiting on condition [0x00007f644abf0000]
   java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00000000c91de1d0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:146)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)