Buffer stats when Back Pressure is high

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Buffer stats when Back Pressure is high

Gagan Agrawal
Hi,
I want to understand does any of buffer stats help in debugging / validating that downstream operator is performing slow when Back Pressure is high? Say I have A -> B operators and A shows High Back Pressure which indicates something wrong or not performing well on B side which is slowing down operator A. However when I look at buffers.inputQueueLength for operator B, it's 0. My understanding is that when B is processing slow, it's input buffer will be full of incoming messages which ultimately blocks/slows down upstream operator A. However it doesn't seem to be happening in my case. Can someone throw some light on how should different stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength, numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when downstream operator is performing slow?

Gagan
Reply | Threaded
Open this post in threaded view
|

Re: Buffer stats when Back Pressure is high

Zhijiang(wangzhijiang999)
Hi Gagan,

What flink version do you use? And have you checked the buffers.inputQueueLength for all the related parallelism (connected with A) of B?  It may exist the scenario that only one parallelim B is full of inqueue buffers which back pressure A, and the input queue for other parallelism B is empty.

Best,
Zhijiang

------------------------------------------------------------------
From:Gagan Agrawal <[hidden email]>
Send Time:2019年1月7日(星期一) 12:06
To:user <[hidden email]>
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in debugging / validating that downstream operator is performing slow when Back Pressure is high? Say I have A -> B operators and A shows High Back Pressure which indicates something wrong or not performing well on B side which is slowing down operator A. However when I look at buffers.inputQueueLength for operator B, it's 0. My understanding is that when B is processing slow, it's input buffer will be full of incoming messages which ultimately blocks/slows down upstream operator A. However it doesn't seem to be happening in my case. Can someone throw some light on how should different stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength, numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when downstream operator is performing slow?

Gagan

Reply | Threaded
Open this post in threaded view
|

Re: Buffer stats when Back Pressure is high

Gagan Agrawal
Flink Version is 1.7. 
Thanks Zhijiang for your pointer. Initially I was checking only for few. However I just checked for all and found couple of them having queue length of 40+ which seems to be due to skewness in data. Is there any general guide lines on how to handle skewed data? In my case I am taking union and then keyBy (with custom stateful Process function) on enrichment id of 2 streams (1 enrichment stream with low volume and another regular data stream with high volume). I see that 30% of my data stream records have same enrichment Id and hence go to same tasks which results in skewness. Any pointers on how to handle skewness while doing keyBy would be of great help.

Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang <[hidden email]> wrote:
Hi Gagan,

What flink version do you use? And have you checked the buffers.inputQueueLength for all the related parallelism (connected with A) of B?  It may exist the scenario that only one parallelim B is full of inqueue buffers which back pressure A, and the input queue for other parallelism B is empty.

Best,
Zhijiang

------------------------------------------------------------------
From:Gagan Agrawal <[hidden email]>
Send Time:2019年1月7日(星期一) 12:06
To:user <[hidden email]>
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in debugging / validating that downstream operator is performing slow when Back Pressure is high? Say I have A -> B operators and A shows High Back Pressure which indicates something wrong or not performing well on B side which is slowing down operator A. However when I look at buffers.inputQueueLength for operator B, it's 0. My understanding is that when B is processing slow, it's input buffer will be full of incoming messages which ultimately blocks/slows down upstream operator A. However it doesn't seem to be happening in my case. Can someone throw some light on how should different stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength, numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when downstream operator is performing slow?

Gagan

Reply | Threaded
Open this post in threaded view
|

Re: Buffer stats when Back Pressure is high

Timo Walther
Hi Gagan,

a typical solution to such a problem is to introduce an artifical key (enrichment id + some additional suffix), you can then keyBy on this artificial key and thus spread the workload more evenly. Of course you need to make sure that records of the second stream are duplicated to all operators with the same artificial key.

Depending on the frequency of the second stream, it might also worth to use a broadcast join that distributes the second stream to all operators such that all operators can perform the enrichment step in a round robin fashion.

Regards,
Timo

Am 07.01.19 um 14:45 schrieb Gagan Agrawal:
Flink Version is 1.7. 
Thanks Zhijiang for your pointer. Initially I was checking only for few. However I just checked for all and found couple of them having queue length of 40+ which seems to be due to skewness in data. Is there any general guide lines on how to handle skewed data? In my case I am taking union and then keyBy (with custom stateful Process function) on enrichment id of 2 streams (1 enrichment stream with low volume and another regular data stream with high volume). I see that 30% of my data stream records have same enrichment Id and hence go to same tasks which results in skewness. Any pointers on how to handle skewness while doing keyBy would be of great help.

Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang <[hidden email]> wrote:
Hi Gagan,

What flink version do you use? And have you checked the buffers.inputQueueLength for all the related parallelism (connected with A) of B?  It may exist the scenario that only one parallelim B is full of inqueue buffers which back pressure A, and the input queue for other parallelism B is empty.

Best,
Zhijiang

------------------------------------------------------------------
From:Gagan Agrawal <[hidden email]>
Send Time:2019年1月7日(星期一) 12:06
To:user <[hidden email]>
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in debugging / validating that downstream operator is performing slow when Back Pressure is high? Say I have A -> B operators and A shows High Back Pressure which indicates something wrong or not performing well on B side which is slowing down operator A. However when I look at buffers.inputQueueLength for operator B, it's 0. My understanding is that when B is processing slow, it's input buffer will be full of incoming messages which ultimately blocks/slows down upstream operator A. However it doesn't seem to be happening in my case. Can someone throw some light on how should different stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength, numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when downstream operator is performing slow?

Gagan


Reply | Threaded
Open this post in threaded view
|

Re: Buffer stats when Back Pressure is high

Gagan Agrawal
Thanks Timo for suggested solution. Will go with idea of artificial key for our use case.

Gagan 

On Mon, Jan 7, 2019 at 10:21 PM Timo Walther <[hidden email]> wrote:
Hi Gagan,

a typical solution to such a problem is to introduce an artifical key (enrichment id + some additional suffix), you can then keyBy on this artificial key and thus spread the workload more evenly. Of course you need to make sure that records of the second stream are duplicated to all operators with the same artificial key.

Depending on the frequency of the second stream, it might also worth to use a broadcast join that distributes the second stream to all operators such that all operators can perform the enrichment step in a round robin fashion.

Regards,
Timo

Am 07.01.19 um 14:45 schrieb Gagan Agrawal:
Flink Version is 1.7. 
Thanks Zhijiang for your pointer. Initially I was checking only for few. However I just checked for all and found couple of them having queue length of 40+ which seems to be due to skewness in data. Is there any general guide lines on how to handle skewed data? In my case I am taking union and then keyBy (with custom stateful Process function) on enrichment id of 2 streams (1 enrichment stream with low volume and another regular data stream with high volume). I see that 30% of my data stream records have same enrichment Id and hence go to same tasks which results in skewness. Any pointers on how to handle skewness while doing keyBy would be of great help.

Gagan

On Mon, Jan 7, 2019 at 3:25 PM zhijiang <[hidden email]> wrote:
Hi Gagan,

What flink version do you use? And have you checked the buffers.inputQueueLength for all the related parallelism (connected with A) of B?  It may exist the scenario that only one parallelim B is full of inqueue buffers which back pressure A, and the input queue for other parallelism B is empty.

Best,
Zhijiang

------------------------------------------------------------------
From:Gagan Agrawal <[hidden email]>
Send Time:2019年1月7日(星期一) 12:06
To:user <[hidden email]>
Subject:Buffer stats when Back Pressure is high

Hi,
I want to understand does any of buffer stats help in debugging / validating that downstream operator is performing slow when Back Pressure is high? Say I have A -> B operators and A shows High Back Pressure which indicates something wrong or not performing well on B side which is slowing down operator A. However when I look at buffers.inputQueueLength for operator B, it's 0. My understanding is that when B is processing slow, it's input buffer will be full of incoming messages which ultimately blocks/slows down upstream operator A. However it doesn't seem to be happening in my case. Can someone throw some light on how should different stats around buffers (e.g buffers.inPoolUsage, buffers.inputQueueLength, numBuffersInLocalPerSecond, numBuffersInRemotePerSecond) look like when downstream operator is performing slow?

Gagan