Backpressure? for Batches

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Backpressure? for Batches

Darshan Singh
I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks
Reply | Threaded
Open this post in threaded view
|

回复:Backpressure? for Batches

Zhijiang(wangzhijiang999)
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <[hidden email]>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure? for Batches

Darshan Singh
Thanks,

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <[hidden email]>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: Backpressure? for Batches

Chesnay Schepler
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is data to be consumed, i.e. one the first record was emitted by the previous operator. As such back-pressure exists in batch just like in streaming.

On 29.08.2018 11:39, Darshan Singh wrote:
Thanks,

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <[hidden email]>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks


Reply | Threaded
Open this post in threaded view
|

Re: Backpressure? for Batches

Darshan Singh
Thanks, Now back to my question again. How can I say read at less speed from hdfs than my say map or group by can consume? Is there some sort of configuration which says read only 10000 rows and then stop and then reread etc. Otherwise source will keep on sending the data or keeping in some sort of buffers and will be OOM. Or setting different parallelism while reading is the way to handle this?

Thanks

On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <[hidden email]> wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is data to be consumed, i.e. one the first record was emitted by the previous operator. As such back-pressure exists in batch just like in streaming.

On 29.08.2018 11:39, Darshan Singh wrote:
Thanks,

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <[hidden email]>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks


Reply | Threaded
Open this post in threaded view
|

回复:Backpressure? for Batches

Zhijiang(wangzhijiang999)
chesnay is right. For batch job there are two ways for notifying consumable. One is the first record emitted by upstream and the other is the upstream finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the upstreams until source node. But it will not cause OOM for caching in-flight buffers normally because they are managed by framework and will not exceed unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow node(groupby in your case) or decrease the parallelism of fast node(source in your case).

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay <[hidden email]>
抄 送:wangzhijiang999 <[hidden email]>; user <[hidden email]>
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from hdfs than my say map or group by can consume? Is there some sort of configuration which says read only 10000 rows and then stop and then reread etc. Otherwise source will keep on sending the data or keeping in some sort of buffers and will be OOM. Or setting different parallelism while reading is the way to handle this?

Thanks

On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <[hidden email]> wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is data to be consumed, i.e. one the first record was emitted by the previous operator. As such back-pressure exists in batch just like in streaming.

On 29.08.2018 11:39, Darshan Singh wrote:
Thanks,

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <[hidden email]>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks



Reply | Threaded
Open this post in threaded view
|

Re: Backpressure? for Batches

Darshan Singh
Thanks,

I thought either Group by is causing the OOM but it is mentioned that sort will be spilled to disk so that there is no way for that to cause the OOM. So I was looking maybe due to back pressure some of data read from hdfs is kept in memory as it is not consumed and that is causing OOM.So it seems this is not possible as well so need to relook what could be causing the OOM.

Thanks

On Wed, Aug 29, 2018 at 12:41 PM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
chesnay is right. For batch job there are two ways for notifying consumable. One is the first record emitted by upstream and the other is the upstream finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the upstreams until source node. But it will not cause OOM for caching in-flight buffers normally because they are managed by framework and will not exceed unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow node(groupby in your case) or decrease the parallelism of fast node(source in your case).

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay <[hidden email]>
抄 送:wangzhijiang999 <[hidden email]>; user <[hidden email]>
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from hdfs than my say map or group by can consume? Is there some sort of configuration which says read only 10000 rows and then stop and then reread etc. Otherwise source will keep on sending the data or keeping in some sort of buffers and will be OOM. Or setting different parallelism while reading is the way to handle this?

Thanks

On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <[hidden email]> wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is data to be consumed, i.e. one the first record was emitted by the previous operator. As such back-pressure exists in batch just like in streaming.

On 29.08.2018 11:39, Darshan Singh wrote:
Thanks,

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <[hidden email]>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks



Reply | Threaded
Open this post in threaded view
|

回复:Backpressure? for Batches

Zhijiang(wangzhijiang999)
You can check the log to show the related stack in OOM, maybe we can confirm some reasons.
Or you can dump the heap to analyze the memory usages after OOM.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 19:22
收件人:wangzhijiang999 <[hidden email]>
抄 送:chesnay <[hidden email]>; user <[hidden email]>
主 题:Re: Backpressure? for Batches

Thanks,

I thought either Group by is causing the OOM but it is mentioned that sort will be spilled to disk so that there is no way for that to cause the OOM. So I was looking maybe due to back pressure some of data read from hdfs is kept in memory as it is not consumed and that is causing OOM.So it seems this is not possible as well so need to relook what could be causing the OOM.

Thanks

On Wed, Aug 29, 2018 at 12:41 PM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
chesnay is right. For batch job there are two ways for notifying consumable. One is the first record emitted by upstream and the other is the upstream finishes all the records (blocking mode).

For your case, the slow groupby node will trigger back-pressure and block the upstreams until source node. But it will not cause OOM for caching in-flight buffers normally because they are managed by framework and will not exceed unlimited, only if the netty buffer pool may cause that I experienced before.

One possible way to avoid backpressure is to increase the parallelism of slow node(groupby in your case) or decrease the parallelism of fast node(source in your case).

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 18:16
收件人:chesnay <[hidden email]>
抄 送:wangzhijiang999 <[hidden email]>; user <[hidden email]>
主 题:Re: Backpressure? for Batches

Thanks, Now back to my question again. How can I say read at less speed from hdfs than my say map or group by can consume? Is there some sort of configuration which says read only 10000 rows and then stop and then reread etc. Otherwise source will keep on sending the data or keeping in some sort of buffers and will be OOM. Or setting different parallelism while reading is the way to handle this?

Thanks

On Wed, Aug 29, 2018 at 12:11 PM Chesnay Schepler <[hidden email]> wrote:
The semantics for LAZY_FROM_SOURCE are that tasks are scheduled when there is data to be consumed, i.e. one the first record was emitted by the previous operator. As such back-pressure exists in batch just like in streaming.

On 29.08.2018 11:39, Darshan Singh wrote:
Thanks,

My job is simple. I am using table Api
1. Read from hdfs
2. Deserialize json to pojo and convert to table.
3. Group by some columns.
4. Convert back to dataset and write back to hdfs.

In the WebUI I can see at least first 3 running concurrently which sort of makes sense. From your answer I understood that flink will do first number 1 once that is completed it will do map(or grouping as well) and then grouping and finally the write. Thus, there should be 1 task running at 1 time. This doesnt seem right to me or I misunderstood what you said.

So here if my group by is slow then I expect some sort of back pressure on the deserialise part or maybe reading from hdfs itself? 

Thanks

On Wed, Aug 29, 2018 at 11:03 AM Zhijiang(wangzhijiang999) <[hidden email]> wrote:
The backpressure is caused when downstream and upstream are running concurrently, and the downstream is slower than the upstream.
In stream job, the schedule mode will schedule both sides concurrently, so the backpressure may exist.
As for batch job, the default schedule mode is LAZY_FROM_SOURCE I remember, that means the downstream will be scheduled after upstream finishes, so the slower downstream will not block upstream running, then the backpressure may not exist in this case.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Darshan Singh <[hidden email]>
发送时间:2018年8月29日(星期三) 16:20
收件人:user <[hidden email]>
主 题:Backpressure? for Batches

I faced the issue with back pressure in streams. I was wondering if we could face the same with the batches as well.

In theory it should be possible. But in Web UI for backpressure tab for batches I was seeing that it was just showing the tasks status and no status like "OK" etc.

So I was wondering if backpressure is a thing for batches. If yes, how do we reduce this especially if I am reading from hdfs.

Thanks