how to understand the flink flow control

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

how to understand the flink flow control

Zhijiang(wangzhijiang999)

As said in apache page, Flink's streaming runtime has natural flow control: Slow downstream operators backpressure faster upstream operators.

How to understand the flink natural flow control? 

As i know, heron has the backpressure mechanism, if some tasks process slowly, it will stop reading from source and notify other tasks to stop reading from source.

In flink, if the producer task process quickly, it will emit the results to consumer. So the buffer in InputChannel of consumer wil be filled up, if the consumer process slowly, how to control the upstream flow?


Thank you for any suggestions in advance!



Best wishes,


Zhijiang Wang

Reply | Threaded
Open this post in threaded view
|

Re: how to understand the flink flow control

Ufuk Celebi
Hey Zhijiang Wang,

I will update the docs next week with more information. The short version is that flow control happens via the buffer pools that Flink uses for produced and consumed intermediate results.

The slightly ;) longer version:

Each task has buffer pools. The size of these buffer pools depends on multiple things (per task manager):
- the configured number of network buffers [default 2048]
- the number of tasks running
- the number of consumed and produced outputs

Each consumed input (if you are looking into the code: each SingleInputGate) has a buffer pool associated with it and each produced intermediate result (see IntermediateResultPartition in the code) as well.

Each produced record is serialized into a buffer of the respective buffer pool of the produced result partition and dispatched to the consumer (either local or remote via network). After the buffer has been consumed it is recycled to the pool and can be used for the outstanding records. If the producer is faster than the consumer, these buffer will take longer to be available again and the producer will slow down by waiting on a buffer.

For local exchange the buffer is consumed as soon as the local consumer has deserialized the records and for remote exchange as soon as the network layer has dispatched the buffer.

For the input side, there is a similar mechanism. The network layer receives a buffer and copies it to the buffer pool of the respective input gate and queues the filled buffer to the (remote) input channel. If there is no buffer available at the input gate, the TCP channel is not read until a buffer is available again. This backpressures remote receivers, because their output buffers are not dispatched and cannot be recycled.


I hope this helps. If you have further questions, just post them here. I will update the docs with some figures, so this will be easier to follow.

– Ufuk

On 07 Aug 2015, at 03:37, wangzhijiang999 <[hidden email]> wrote:

> As said in apache page, Flink's streaming runtime has natural flow control: Slow downstream operators backpressure faster upstream operators.
> How to understand the flink natural flow control?
> As i know, heron has the backpressure mechanism, if some tasks process slowly, it will stop reading from source and notify other tasks to stop reading from source.
> In flink, if the producer task process quickly, it will emit the results to consumer. So the buffer in InputChannel of consumer wil be filled up, if the consumer process slowly, how to control the upstream flow?
>
> Thank you for any suggestions in advance!
>
>
> Best wishes,
>
> Zhijiang Wang

Reply | Threaded
Open this post in threaded view
|

答复:how to understand the flink flow control

Zhijiang(wangzhijiang999)
In reply to this post by Zhijiang(wangzhijiang999)

Hi Ufuk,


       Thank you for your detail and clear explaination!  I reviewed the code based on your info and got it clearly.


For local transfer:

  1. When producer emits results to ResultPartition, it will request  Buffer from the pool. If there  are no available Buffer, it will wait .

  2. The ResultPartition of producer is the InputGate of Consumer, when consumer read buffer from InputGate and deserializer the buffer, the Buffer will be recycled ,so the producer can request Buffer again.

  3. So if the consumer slows, the Buffer in the ResultPartition of producer can not be recycled quickly, resulting in producer has no available Buffer to emit data.

  4. If the producer waits the available Buffer to emit, it can not process elements and read next data from its InputGate, resulting in slowing the producer.

For remote transfer:

     The InputGate and ResultPartition for each task are separate

     1. For producer the Buffer in ResultPartition will be recycled when producer write them to the channel.

     2. For consumer the Buffer will be recycled when consumer read it from InputGate and deserializer it . The consumer need to request Buffer when reading data from channel     and put them to InputGate.

     3.  When consumer slows and there are no available Buffer , the consumer will not read data from channel, so it will affect the producer writing data to channel and  the producer will have no available Buffer to emit result at last resulting in slowing producer.


My understanding is right?   Looking forward to your docs!


Best wishes,


Zhijiang Wang


------------------------------------------------------------------

发件人:Ufuk Celebi <[hidden email]>

发送时间:2015年8月7日(星期五) 21:07

收件人:user <[hidden email]>,wangzhijiang999 <[hidden email]>

主 题:Re: how to understand the flink flow control


Hey Zhijiang Wang,


I will update the docs next week with more information. The short version is that flow control happens via the buffer pools that Flink uses for produced and consumed intermediate results.


The slightly ;) longer version:


Each task has buffer pools. The size of these buffer pools depends on multiple things (per task manager):

- the configured number of network buffers [default 2048]

- the number of tasks running

- the number of consumed and produced outputs


Each consumed input (if you are looking into the code: each SingleInputGate) has a buffer pool associated with it and each produced intermediate result (see IntermediateResultPartition in the code) as well.


Each produced record is serialized into a buffer of the respective buffer pool of the produced result partition and dispatched to the consumer (either local or remote via network). After the buffer has been consumed it is recycled to the pool and can be used for the outstanding records. If the producer is faster than the consumer, these buffer will take longer to be available again and the producer will slow down by waiting on a buffer.


For local exchange the buffer is consumed as soon as the local consumer has deserialized the records and for remote exchange as soon as the network layer has dispatched the buffer.


For the input side, there is a similar mechanism. The network layer receives a buffer and copies it to the buffer pool of the respective input gate and queues the filled buffer to the (remote) input channel. If there is no buffer available at the input gate, the TCP channel is not read until a buffer is available again. This backpressures remote receivers, because their output buffers are not dispatched and cannot be recycled.



I hope this helps. If you have further questions, just post them here. I will update the docs with some figures, so this will be easier to follow.


– Ufuk


On 07 Aug 2015, at 03:37, wangzhijiang999 <[hidden email]> wrote:


> As said in apache page, Flink's streaming runtime has natural flow control: Slow downstream operators backpressure faster upstream operators.

> How to understand the flink natural flow control? 

> As i know, heron has the backpressure mechanism, if some tasks process slowly, it will stop reading from source and notify other tasks to stop reading from source.

> In flink, if the producer task process quickly, it will emit the results to consumer. So the buffer in InputChannel of consumer wil be filled up, if the consumer process slowly, how to control the upstream flow?

> Thank you for any suggestions in advance!

> Best wishes,

> Zhijiang Wang


Reply | Threaded
Open this post in threaded view
|

Re: 答复:how to understand the flink flow control

Ufuk Celebi
Good to hear. Answers to your questions are inline.

I think we should move the discussion to the [hidden email] list though if there are more questions.

On Mon, Aug 10, 2015 at 9:29 AM, wangzhijiang999 <[hidden email]> wrote:

Hi Ufuk,


       Thank you for your detail and clear explaination!  I reviewed the code based on your info and got it clearly.


For local transfer:

  1. When producer emits results to ResultPartition, it will request  Buffer from the pool. If there  are no available Buffer, it will wait .


Yes, the buffer request is blocking.
 
  1. The ResultPartition of producer is the InputGate of Consumer, when consumer read buffer from InputGate and deserializer the buffer, the Buffer will be recycled ,so the producer can request Buffer again.


The producer never requests a buffer from the input gate, but from its own pool. If the input gate is slow to recycle its buffers, the producer will become slow as well. But you are right that the producer will not be able to request a buffer if the input gate never recycles them.
 
  1. So if the consumer slows, the Buffer in the ResultPartition of producer can not be recycled quickly, resulting in producer has no available Buffer to emit data.

  2. If the producer waits the available Buffer to emit, it can not process elements and read next data from its InputGate, resulting in slowing the producer.

Yes. The task thread is both consuming and producing data. So if its blocked on the output side, the input is not consumed, which propagates back pressure further.
 

For remote transfer:

     The InputGate and ResultPartition for each task are separate

     1. For producer the Buffer in ResultPartition will be recycled when producer write them to the channel.

     2. For consumer the Buffer will be recycled when consumer read it from InputGate and deserializer it . The consumer need to request Buffer when reading data from channel     and put them to InputGate.

     3.  When consumer slows and there are no available Buffer , the consumer will not read data from channel, so it will affect the producer writing data to channel and  the producer will have no available Buffer to emit result at last resulting in slowing producer.


Yes but there is some logic involved to stop reading from the actual TCP channel to back pressure the remote producer. Essentially the selector read event is unsubscribed and on the output side there is Netty's watermark mechanism, which prevents further data from being written to the TCP channel.


Reply | Threaded
Open this post in threaded view
|

答复:答复:how to understand the flink flow control

Zhijiang(wangzhijiang999)
In reply to this post by Zhijiang(wangzhijiang999)

Hi Ufuk,


          Thank you for your instant reply and I have got the basic idea. 


           I am researching flink code now, and  if I have further specific questions next time, I will send to [hidden email] .


Best wishes,


Zhijiang Wang


------------------------------------------------------------------

发件人:Ufuk Celebi <[hidden email]>

发送时间:2015年8月10日(星期一) 17:13

收件人:user <[hidden email]>,wangzhijiang999 <[hidden email]>

主 题:Re: 答复:how to understand the flink flow control


Good to hear. Answers to your questions are inline.


I think we should move the discussion to the [hidden email] list though if there are more questions.


On Mon, Aug 10, 2015 at 9:29 AM, wangzhijiang999 <[hidden email]> wrote:

Hi Ufuk,


       Thank you for your detail and clear explaination!  I reviewed the code based on your info and got it clearly.


For local transfer:

  1. When producer emits results to ResultPartition, it will request  Buffer from the pool. If there  are no available Buffer, it will wait .


Yes, the buffer request is blocking.

 

  1. The ResultPartition of producer is the InputGate of Consumer, when consumer read buffer from InputGate and deserializer the buffer, the Buffer will be recycled ,so the producer can request Buffer again.


The producer never requests a buffer from the input gate, but from its own pool. If the input gate is slow to recycle its buffers, the producer will become slow as well. But you are right that the producer will not be able to request a buffer if the input gate never recycles them.

 

  1. So if the consumer slows, the Buffer in the ResultPartition of producer can not be recycled quickly, resulting in producer has no available Buffer to emit data.

  2. If the producer waits the available Buffer to emit, it can not process elements and read next data from its InputGate, resulting in slowing the producer.

Yes. The task thread is both consuming and producing data. So if its blocked on the output side, the input is not consumed, which propagates back pressure further.

 

For remote transfer:

     The InputGate and ResultPartition for each task are separate

     1. For producer the Buffer in ResultPartition will be recycled when producer write them to the channel.

     2. For consumer the Buffer will be recycled when consumer read it from InputGate and deserializer it . The consumer need to request Buffer when reading data from channel     and put them to InputGate.

     3.  When consumer slows and there are no available Buffer , the consumer will not read data from channel, so it will affect the producer writing data to channel and  the producer will have no available Buffer to emit result at last resulting in slowing producer.


Yes but there is some logic involved to stop reading from the actual TCP channel to back pressure the remote producer. Essentially the selector read event is unsubscribed and on the output side there is Netty's watermark mechanism, which prevents further data from being written to the TCP channel.