Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Narayanaswamy, Krishna
Hi,
 
I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like
DataSource -> Filter -> Map [...multiple]
  • Sink1
  • Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.
 
Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or try out to fix this.
 
Marked below are the 2 isolated thread stacks marking the deadlock -
 
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry
        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
          - locked <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
          - locked <0x2da5> (a java.lang.Object)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
         
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
          - locked <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
Thanks,
Krishna.
 

  ________________________________  

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

RE: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Narayanaswamy, Krishna
Flink version is 1.5.3/Hadoop 27
_____________________________________________
From: Narayanaswamy, Krishna [Tech]
Sent: Wednesday, October 03, 2018 3:42 PM
To: '[hidden email]' <[hidden email]>
Subject: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
 
 
Hi,
 
I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like
DataSource -> Filter -> Map [...multiple]
  • Sink1
  • Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.
 
Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or try out to fix this.
 
Marked below are the 2 isolated thread stacks marking the deadlock -
 
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry
        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
          - locked <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
          - locked <0x2da5> (a java.lang.Object)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
         
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
          - locked <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
Thanks,
Krishna.
 

  ________________________________  

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Aljoscha Krettek
In reply to this post by Narayanaswamy, Krishna
Hi,

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

Best,
Aljoscha

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

Hi,
 
I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like
DataSource -> Filter -> Map [...multiple]
  • Sink1
  • Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.
 
Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or try out to fix this.
 
Marked below are the 2 isolated thread stacks marking the deadlock -
 
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry
        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
          - locked <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
          - locked <0x2da5> (a java.lang.Object)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
         
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
          - locked <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
Thanks,
Krishna.
 

  ________________________________  

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Piotr Nowojski
Hi,

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

Piotrek

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

Hi,

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

Best,
Aljoscha

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

Hi,
 
I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like
DataSource -> Filter -> Map [...multiple]
  • Sink1
  • Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.
 
Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or try out to fix this.
 
Marked below are the 2 isolated thread stacks marking the deadlock -
 
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry
        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
          - locked <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
          - locked <0x2da5> (a java.lang.Object)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
         
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
          - locked <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
Thanks,
Krishna.
 

  ________________________________  

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices


Reply | Threaded
Open this post in threaded view
|

回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Zhijiang(wangzhijiang999)
There actually exists this deadlock for special scenarios.

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.
Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

Best,
Zhijiang

------------------------------------------------------------------
发件人:Piotr Nowojski <[hidden email]>
发送时间:2018年10月4日(星期四) 21:54
收件人:Aljoscha Krettek <[hidden email]>
抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>
主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Hi,

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

Piotrek

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

Hi,

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

Best,
Aljoscha

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

Hi,
 
I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like
DataSource -> Filter -> Map [...multiple]
  • Sink1
  • Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.
 
Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or try out to fix this.
 
Marked below are the 2 isolated thread stacks marking the deadlock -
 
Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry
        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
          - locked <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
          - locked <0x2da5> (a java.lang.Object)
          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
         
Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry
  java.lang.Thread.State: BLOCKED
        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
          - locked <0x2dfb> (a java.util.ArrayDeque)
          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)
 
Thanks,
Krishna.
 

  ________________________________  

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices



Reply | Threaded
Open this post in threaded view
|

RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Narayanaswamy, Krishna

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Zhijiang(wangzhijiang999)
I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

Best,
Zhijiang
------------------------------------------------------------------
From:Narayanaswamy, Krishna <[hidden email]>
Send Time:2019年5月17日(星期五) 17:37
To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Narayanaswamy, Krishna

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Zhijiang(wangzhijiang999)
I already created the jira [1] for it and you could monitor it for progress.

In addition, the SpillableSubpartition would be abandoned from FLINK-1.9, and stephan already implemented a new BoundedBlockingSubpartition to replace it. Of course we would still provide the support for the existing bugs in previous flink versions.

[1] https://issues.apache.org/jira/browse/FLINK-12544

Best,
Zhijiang

------------------------------------------------------------------
From:Narayanaswamy, Krishna <[hidden email]>
Send Time:2019年5月17日(星期五) 19:00
To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Narayanaswamy, Krishna
In reply to this post by Zhijiang(wangzhijiang999)

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Zhijiang(wangzhijiang999)
Hi  Krishna,

Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before?

Best,
Zhijiang
------------------------------------------------------------------
From:Narayanaswamy, Krishna <[hidden email]>
Send Time:2019年5月21日(星期二) 19:50
To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks
 

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Narayanaswamy, Krishna

Hi Zhijiang,

 

I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING)

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi  Krishna,

 

Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before?

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019521(星期二) 19:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Zhijiang(wangzhijiang999)
If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right?

If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed.

Best,
Zhijiang
------------------------------------------------------------------
From:Narayanaswamy, Krishna <[hidden email]>
Send Time:2019年5月22日(星期三) 00:49
To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Hi Zhijiang,

 

I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING)

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi  Krishna,

 

Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before?

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019521(星期二) 19:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Zhijiang(wangzhijiang999)
Yes, it is the same case as multiple slots in TM. The source task and co-group task are still in the same TM in this case. I think you might enable slot sharing, so they are running still in the same slot in one TM.
BTW, the previous deadlock issue is already fixed on my side, and it is waiting for review atm. You could pick the code in PR to verfiy the results if you like. And the next release-1.8.1 might cover this fix as well.

Best,
Zhijiang
------------------------------------------------------------------
From:Erai, Rahul <[hidden email]>
Send Time:2019年6月4日(星期二) 15:50
To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; "Narayanaswamy, Krishna" <[hidden email]>
Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Hello Zhijiang,

We have been seeing deadlocks with single slot TMs as well. Attaching the thread dump as requested. Looks similar to what was had with multi-slots TMs.

Thanks,

Rahul

 

From: zhijiang <[hidden email]>
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right?

 

If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019522(星期三) 00:49

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi Zhijiang,

 

I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING)

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi  Krishna,

 

Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before?

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019521(星期二) 19:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Zhijiang(wangzhijiang999)
The jira is https://issues.apache.org/jira/browse/FLINK-12544 and you could find the PR link in it.
------------------------------------------------------------------
From:Erai, Rahul <[hidden email]>
Send Time:2019年6月4日(星期二) 18:19
To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; "Narayanaswamy, Krishna" <[hidden email]>
Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>
Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Thanks Zhijiang.

Can you point us to the JIRA for your fix?

 

Regards,

-Rahul

 

From: zhijiang <[hidden email]>
Sent: Tuesday, June 4, 2019 1:26 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Yes, it is the same case as multiple slots in TM. The source task and co-group task are still in the same TM in this case. I think you might enable slot sharing, so they are running still in the same slot in one TM.

BTW, the previous deadlock issue is already fixed on my side, and it is waiting for review atm. You could pick the code in PR to verfiy the results if you like. And the next release-1.8.1 might cover this fix as well.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Erai, Rahul <[hidden email]>

Send Time:201964(星期二) 15:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; "Narayanaswamy, Krishna" <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hello Zhijiang,

We have been seeing deadlocks with single slot TMs as well. Attaching the thread dump as requested. Looks similar to what was had with multi-slots TMs.

Thanks,

Rahul

 

From: zhijiang <[hidden email]>
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right?

 

If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019522(星期三) 00:49

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi Zhijiang,

 

I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING)

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi  Krishna,

 

Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before?

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019521(星期二) 19:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Reply | Threaded
Open this post in threaded view
|

RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Chan, Regina

Can we get this into a Flink 1.6+ and released? What’s the plan for the next rev?

 

 

From: zhijiang <[hidden email]>
Sent: Tuesday, June 4, 2019 6:37 AM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

The jira is https://issues.apache.org/jira/browse/FLINK-12544 and you could find the PR link in it.

------------------------------------------------------------------

From:Erai, Rahul <[hidden email]>

Send Time:201964(星期二) 18:19

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; "Narayanaswamy, Krishna" <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

Can you point us to the JIRA for your fix?

 

Regards,

-Rahul

 

From: zhijiang <[hidden email]>
Sent: Tuesday, June 4, 2019 1:26 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Yes, it is the same case as multiple slots in TM. The source task and co-group task are still in the same TM in this case. I think you might enable slot sharing, so they are running still in the same slot in one TM.

BTW, the previous deadlock issue is already fixed on my side, and it is waiting for review atm. You could pick the code in PR to verfiy the results if you like. And the next release-1.8.1 might cover this fix as well.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Erai, Rahul <[hidden email]>

Send Time:201964(星期二) 15:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; "Narayanaswamy, Krishna" <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hello Zhijiang,

We have been seeing deadlocks with single slot TMs as well. Attaching the thread dump as requested. Looks similar to what was had with multi-slots TMs.

Thanks,

Rahul

 

From: zhijiang <[hidden email]>
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right?

 

If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019522(星期三) 00:49

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi Zhijiang,

 

I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING)

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi  Krishna,

 

Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before?

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019521(星期二) 19:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices
Reply | Threaded
Open this post in threaded view
|

Re: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

Chesnay Schepler
1.6.4 was the last bugfix release in the 1.6 series.

On 05/06/2019 20:49, Chan, Regina wrote:

Can we get this into a Flink 1.6+ and released? What’s the plan for the next rev?

 

 

From: zhijiang [hidden email]
Sent: Tuesday, June 4, 2019 6:37 AM
To: Aljoscha Krettek [hidden email]; Piotr Nowojski [hidden email]; Narayanaswamy, Krishna [Tech] [hidden email]; Erai, Rahul [Tech] [hidden email]
Cc: Nico Kruber [hidden email]; [hidden email]; Chan, Regina [Tech] [hidden email]
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

The jira is https://issues.apache.org/jira/browse/FLINK-12544 and you could find the PR link in it.

------------------------------------------------------------------

From:Erai, Rahul <[hidden email]>

Send Time:201964(星期二) 18:19

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; "Narayanaswamy, Krishna" <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

Can you point us to the JIRA for your fix?

 

Regards,

-Rahul

 

From: zhijiang <[hidden email]>
Sent: Tuesday, June 4, 2019 1:26 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Yes, it is the same case as multiple slots in TM. The source task and co-group task are still in the same TM in this case. I think you might enable slot sharing, so they are running still in the same slot in one TM.

BTW, the previous deadlock issue is already fixed on my side, and it is waiting for review atm. You could pick the code in PR to verfiy the results if you like. And the next release-1.8.1 might cover this fix as well.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Erai, Rahul <[hidden email]>

Send Time:201964(星期二) 15:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; "Narayanaswamy, Krishna" <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hello Zhijiang,

We have been seeing deadlocks with single slot TMs as well. Attaching the thread dump as requested. Looks similar to what was had with multi-slots TMs.

Thanks,

Rahul

 

From: zhijiang <[hidden email]>
Sent: Wednesday, May 22, 2019 7:56 AM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

If it is still the case of multiple slots in one TaskManager, it is the same as before. But you said you already used the single slot per TaskManager, right?

 

If it is the case of single slot in TaskManager, you could attach the jstack when occurs next time, otherwise it is not needed.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019522(星期三) 00:49

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi Zhijiang,

 

I couldn’t get the jstack due to some constraints this time around. Will try and get them when it occurs next. But from the looks of it from the console/logs it appears to be the same as the 2 slot cases. DataSource finishing up and CoGroup looking to move from DEPLOYING to RUNNING (and stuck at DEPLOYING)

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Tuesday, May 21, 2019 7:38 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi  Krishna,

 

Could you show me or attach the jstack for the single slot case? Or is it the same jstack as before?

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019521(星期二) 19:50

To:zhijiang <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We started to run jobs using the single slotted task managers which seemed to be ok for the past couple of days, but today morning we seem to be seeing these deadlocks even with 1 slot. Is there something else we could try out?

 

Thanks,

Krishna.

 

From: Narayanaswamy, Krishna [Tech]
Sent: Friday, May 17, 2019 4:20 PM
To: 'zhijiang' <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: RE:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Thanks Zhijiang.

 

We will try these deadlock usecases with a single slot approach to see how they go. Will await the fix to start using more slots on the single TM.

 

Thanks,

Krishna.

 

From: zhijiang <[hidden email]>
Sent: Friday, May 17, 2019 4:05 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>; Narayanaswamy, Krishna [Tech] <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]; Chan, Regina [Tech] <[hidden email]>; Erai, Rahul [Tech] <[hidden email]>
Subject: Re:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

I already analyzed out this deadlock case based on the codes. FLINK-10491 has already solved on place to cause deadlock in SpillableSubpartition, but this is a different place to cause this issue.

 

When source task is trying to release subpartition memory, meanwhile another CoGroup task is submitted to trigger source task to release its memory, then it might cause deadlock.

 

I would create a jira ticket for this issue and think how to solve it soon. Currently if you still want to use the blocking type, the simple way to avoid this is to make only one slot in TM, then there never happen one task triggers another task to release memory in the same TM. Or you could increase the network buffer setting to work aournd, but not sure this way could work for your case because it is up to the total data size the source produced.

 

Best,

Zhijiang

------------------------------------------------------------------

From:Narayanaswamy, Krishna <[hidden email]>

Send Time:2019517(星期五) 17:37

To:Zhijiang(wangzhijiang999) <[hidden email]>; Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>

Cc:Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>; "Chan, Regina" <[hidden email]>; "Erai, Rahul" <[hidden email]>

Subject:RE: 回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

We see this JIRA issue (FLINK-10491) as fixed and the fix is present in Flink v1.6.4 which we are using now but the problem now seems to come up for relatively simpler scenarios as well. Deadlock dump below -

 

Java stack information for the threads listed above:

===================================================

"CoGroup (2/2)":

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:213)

                - waiting to lock <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"CoGroup (1/2)":

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:277)

                - waiting to lock <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:172)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:95)

                at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:84)

                at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:147)

                at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:121)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:274)

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:239)

                - locked <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:375)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:408)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:297)

                - locked <0x000000063c785350> (a java.lang.Object)

                at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:259)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:272)

                at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224)

                - locked <0x000000062bf859b8> (a java.lang.Object)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:614)

                at java.lang.Thread.run(Thread.java:745)

"DataSource  (1/1)":

                at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:227)

                - waiting to lock <0x000000063fdf4ac8> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:371)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:257)

                - locked <0x000000063fdf4888> (a java.util.ArrayDeque)

                at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)

                at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)

                at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

                at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

                at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)

                at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)

                at java.lang.Thread.run(Thread.java:745)

 

Found 1 deadlock.

 

We are not setting any slot sharing parameters since this batch based processing so it uses the default (and there don’t seem to be any options available to manipulate slot sharing for non-streaming).

If we disable slot sharing (assuming it will be through some config across the job) wouldn’t the job become relatively more slower?

 

Thanks,

Krishna.

 

From: Zhijiang(wangzhijiang999) <[hidden email]>
Sent: Monday, October 08, 2018 1:39 PM
To: Aljoscha Krettek <[hidden email]>; Piotr Nowojski <[hidden email]>
Cc: Narayanaswamy, Krishna [Tech] <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email]
Subject:
回复:Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

There actually exists this deadlock for special scenarios.

 

Before fixing the bug, we can avoid this issue by not deploying the map and sink tasks in the same task manager to work around.

Krishna, do you share the slot for these two tasks? If so, you can set disable slot sharing for this job.

 

Or I guess we can set the ExecutionMode#PIPELINED_FORCED to not generate blocking result partition to avoid this issue temporarily.

 

Best,

Zhijiang

 

------------------------------------------------------------------

发件人:Piotr Nowojski <[hidden email]>

发送时间:2018104(星期四) 21:54

收件人:Aljoscha Krettek <[hidden email]>

抄 送:"Narayanaswamy, Krishna" <[hidden email]>; Nico Kruber <[hidden email]>; [hidden email] <[hidden email]>

主 题:Re: Memory Allocate/Deallocate related Thread Deadlock encountered when running a large job > 10k tasks

 

Hi,

 

Thanks for reporting the problem. This bug was previously unknown to us. I have created a jira ticket for this bug:

 

Unfortunately I’m not familiar with running batch jobs in Flink, so I don’t know if there is some hot fix or anything that can at least mitigate/decrease the probability of the bug for you until we fix it properly. 

 

Piotrek

 

On 4 Oct 2018, at 13:55, Aljoscha Krettek <[hidden email]> wrote:

 

Hi,

 

this looks like a potential Flink bug. Looping in Nico and Piotr who have looked into that in the past. Could you please comment on that?

 

Best,

Aljoscha

 

On 3. Oct 2018, at 12:12, Narayanaswamy, Krishna <[hidden email]> wrote:

 

Hi,

 

I am trying to run one large single job graph which has > 10k tasks. The form of the graph is something like

DataSource -> Filter -> Map [...multiple]

·         Sink1

·         Sink2

I am using a parallelism of 10 with 1 slot per task manager and a memory allocation of 32G per TM. The JM is running with 8G.

 

Everything starts up and runs fine with close to 6-7k tasks (this is variable and is mostly the source /filter/map portions) completing and then the graph just hangs.  I managed to connect to the task managers and get a thread dump just in time and found the following deadlock on one of the TMs which apparently seems to be holding up everything else.

Please could someone take a look and advise if there is something I could do or try out to fix this.

 

Marked below are the 2 isolated thread stacks marking the deadlock -

 

Thread-1

"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA waiting for monitor entry

        waiting for Map (Key Extractor) (1/10)@9967 to release lock on <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)

          - locked <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)

          - locked <0x2da5> (a java.lang.Object)

          at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)

          at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)

          at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)

          at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)

          at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)

          at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)

          at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)

          at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

         

Thread-2

"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor entry

  java.lang.Thread.State: BLOCKED

        blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002

        waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to release lock on <0x2dfd> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)

          at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)

          at org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)

          at org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)

          - locked <0x2dfb> (a java.util.ArrayDeque)

          at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)

          - locked <0x2dfc> (a org.apache.flink.runtime.io.network.partition.SpillableSubpartition)

          at org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)

          at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)

          at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)

          at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)

          at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)

          at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)

          at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)

          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)

          at java.lang.Thread.run(Thread.java:745)

 

Thanks,

Krishna.

 

 

  ________________________________  


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 

 



Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

 




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices