Tuning checkpoint

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Tuning checkpoint

祁明良

Hi all,


I have several questions regarding the checkpoint. The background is I'm using a ProcessFunction keyed by user_id somehow works like following:

inputStream
.keyBy(x => getUserKey(x))
.process(...)
It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?
  1. maybe data skew, but I see the amount of data is almost same
  2. or network?
  3. The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.
Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?
The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?
Best,
Mingliang


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.


WX20180813-001831.png (134K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Tuning checkpoint

Fabian Hueske-2
Hi Mingliang,

let me answer your second question first:

> Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?

When a task sends records to multiple downstream tasks (task not operators!) due to a broadcast or partition/keyBy/shuffle connection, the task broadcasts each checkpoint barrier to all of its receiving tasks.
Therefore, each task that receives records from multiple tasks will receive multiple checkpoint barriers. (Checkpoint barriers behave similar to watermarks in this regard)
In order to provide exactly-once state consistency, a task must buffer records from input connection that forwarded a barrier until barriers from all input connections have been received and the state checkpoint was initiated.

What does this mean for the long checkpoint alignment that you observe?
Checkpoint alignment starts when the first barrier is received and ends when the last barrier is received.
Hence, it seems as if one task manager receives some barrier(s) later than the other nodes, probably because it is more heavily loaded.
The fact that all affected tasks run on the same TM and that you mentioned backpressure is a hint for that because TMs multiplex the connection of all tasks.

Regarding the memory configuration question, I am not sure if there is a way to override the JVM heap size on YARN. Maybe others can answer this question.

Best,
Fabian

2018-08-12 18:36 GMT+02:00 祁明良 <[hidden email]>:

Hi all,


I have several questions regarding the checkpoint. The background is I'm using a ProcessFunction keyed by user_id somehow works like following:

inputStream
.keyBy(x => getUserKey(x))
.process(...)
It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?
  1. maybe data skew, but I see the amount of data is almost same
  2. or network?
  3. The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.
Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?
The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?
Best,
Mingliang


本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.


Reply | Threaded
Open this post in threaded view
|

Re: Tuning checkpoint

祁明良
Thank you for this great answer, Fabian.

Regarding the yarn JVM heap size, I tried to change
containerized.heap-cutoff-ratio: 0.25
And it somehow looks like working, but the actually memory needed for rocksdb still looks like a blackbox  to me. I see there’s already a JIRA ticket talking about this problem[1], created last year and still open yet. What I can do is just keep enlarging this value until YARN don’t kill my TaskManager because of memory usage:)

By the way, my rough calculation of rocksdb memory on each TM is like 
num of slots per task * num of stateful operators(including source and sink?) * (block cache size + write buffer size)

I bet it’s not correct..

Best, 
Mingliang


On 13 Aug 2018, at 11:05 PM, Fabian Hueske <[hidden email]> wrote:

Hi Mingliang,

let me answer your second question first:

> Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?

When a task sends records to multiple downstream tasks (task not operators!) due to a broadcast or partition/keyBy/shuffle connection, the task broadcasts each checkpoint barrier to all of its receiving tasks.
Therefore, each task that receives records from multiple tasks will receive multiple checkpoint barriers. (Checkpoint barriers behave similar to watermarks in this regard)
In order to provide exactly-once state consistency, a task must buffer records from input connection that forwarded a barrier until barriers from all input connections have been received and the state checkpoint was initiated.

What does this mean for the long checkpoint alignment that you observe?
Checkpoint alignment starts when the first barrier is received and ends when the last barrier is received.
Hence, it seems as if one task manager receives some barrier(s) later than the other nodes, probably because it is more heavily loaded.
The fact that all affected tasks run on the same TM and that you mentioned backpressure is a hint for that because TMs multiplex the connection of all tasks.

Regarding the memory configuration question, I am not sure if there is a way to override the JVM heap size on YARN. Maybe others can answer this question.

Best,
Fabian

2018-08-12 18:36 GMT+02:00 祁明良 <[hidden email]>:

Hi all,

I have several questions regarding the checkpoint. The background is I'm using a ProcessFunction keyed by user_id somehow works like following:
inputStream
.keyBy(x => getUserKey(x))
.process(...)
It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?
  1. maybe data skew, but I see the amount of data is almost same
  2. or network?
  3. The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.
Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?
The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?
Best,
Mingliang



本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.




本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: Tuning checkpoint

Hequn Cheng
Hi mingliang,

Considering your first question. I answered it on stack overflow[1].
Hope it helps.

Best, Hequn



On Tue, Aug 14, 2018 at 10:16 AM, 祁明良 <[hidden email]> wrote:
Thank you for this great answer, Fabian.

Regarding the yarn JVM heap size, I tried to change
containerized.heap-cutoff-ratio: 0.25
And it somehow looks like working, but the actually memory needed for rocksdb still looks like a blackbox  to me. I see there’s already a JIRA ticket talking about this problem[1], created last year and still open yet. What I can do is just keep enlarging this value until YARN don’t kill my TaskManager because of memory usage:)

By the way, my rough calculation of rocksdb memory on each TM is like 
num of slots per task * num of stateful operators(including source and sink?) * (block cache size + write buffer size)

I bet it’s not correct..

Best, 
Mingliang


On 13 Aug 2018, at 11:05 PM, Fabian Hueske <[hidden email]> wrote:

Hi Mingliang,

let me answer your second question first:

> Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?

When a task sends records to multiple downstream tasks (task not operators!) due to a broadcast or partition/keyBy/shuffle connection, the task broadcasts each checkpoint barrier to all of its receiving tasks.
Therefore, each task that receives records from multiple tasks will receive multiple checkpoint barriers. (Checkpoint barriers behave similar to watermarks in this regard)
In order to provide exactly-once state consistency, a task must buffer records from input connection that forwarded a barrier until barriers from all input connections have been received and the state checkpoint was initiated.

What does this mean for the long checkpoint alignment that you observe?
Checkpoint alignment starts when the first barrier is received and ends when the last barrier is received.
Hence, it seems as if one task manager receives some barrier(s) later than the other nodes, probably because it is more heavily loaded.
The fact that all affected tasks run on the same TM and that you mentioned backpressure is a hint for that because TMs multiplex the connection of all tasks.

Regarding the memory configuration question, I am not sure if there is a way to override the JVM heap size on YARN. Maybe others can answer this question.

Best,
Fabian

2018-08-12 18:36 GMT+02:00 祁明良 <[hidden email]>:

Hi all,

I have several questions regarding the checkpoint. The background is I'm using a ProcessFunction keyed by user_id somehow works like following:
inputStream
.keyBy(x => getUserKey(x))
.process(...)
It runs on yarn with 40 TMs * 2 slots each, when I look at the checkpoint metrics, only a small number of subtasks have a large "alignment buffered/duration", and looks like either all the 2 slots on the same TM are both high or both low.  What may probably cause this?
  1. maybe data skew, but I see the amount of data is almost same
  2. or network?
  3. The system is under back pressure, but I don't understand why only like 4 out of 80 subtasks perform like this.
Another question is about the alignment buffer, I thought it was only used for multiple input stream cases. But for keyed process function , what is actually aligned?
The last question is about tuning rocksdb, I try to assign some memory to writebuffer and block cache, and the doc says "typically by decreasing the JVM heap size of the TaskManagers by the same amount" , and taskmanager heap size is "On YARN setups, this value is automatically configured to the size of the TaskManager's YARN container, minus a certain tolerance value." This looks like I should decrease the taskmanager heap and the value is set by YARN automatically, so what should I do?
Best,
Mingliang



本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.




本邮件及其附件含有小红书公司的保密信息,仅限于发送给以上收件人或群组。禁止任何其他人以任何形式使用(包括但不限于全部或部分地泄露、复制、或散发)本邮件中的信息。如果您错收了本邮件,请您立即电话或邮件通知发件人并删除本邮件!
This communication may contain privileged or other confidential information of Red. If you have received it in error, please advise the sender by reply e-mail and immediately delete the message and any attachments without copying or disclosing the contents. Thank you.