S3 Checkpointing taking long time with stateful operations

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

S3 Checkpointing taking long time with stateful operations

Kathula, Sandeep

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone

Description automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 

Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Yun Tang
Hi Sandeep

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.
Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].
Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.


Best
Yun Tang

From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations
 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone

Description automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 

Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Congxian Qiu
Hi

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 
For at least once mode, could you please try to enable the debug log to track the barrier align process?
you can find the debug log such as
"Received barrier for checkpoint {} from channel {}"
"Received cancellation barrier for checkpoint {} "
"Received all barriers for checkpoint {}"

Best,
Congxian


Yun Tang <[hidden email]> 于2020年6月19日周五 上午11:48写道:
Hi Sandeep

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.
Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].
Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.


Best
Yun Tang

From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations
 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone

Description automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 

Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Kathula, Sandeep

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu <[hidden email]>
Date: Saturday, June 20, 2020 at 7:19 PM
To: "[hidden email]" <[hidden email]>
Cc: "Kathula, Sandeep" <[hidden email]>, "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone

Description automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 


barriers.log (26K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Dawid Wysakowicz-2

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid


On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone
                        Description automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Kathula, Sandeep
Hi Dawid,
We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing.

Thanks
Sandeep Kathula


On 6/22/20, 11:17 PM, "Dawid Wysakowicz" <[hidden email]> wrote:

    This email is from an external sender.



Screen Shot 2020-06-23 at 9.52.24 AM.png (652K) Download Attachment
Screen Shot 2020-06-23 at 9.52.12 AM.png (570K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Kathula, Sandeep
In reply to this post by Dawid Wysakowicz-2

Hi Dawid,

We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing. 

 

 

Thanks

Sandeep Kathula

 

 

From: Dawid Wysakowicz <[hidden email]>
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid

 

On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone
                        Description automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 

Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Dawid Wysakowicz-2

I see you have a lot of operators chained in a single task. In this scenario the backpressure monitoring will not be very helpful it just shows that the second task does not slow down the first one. It does not tell though if some of the operators in the first task take longer and slow down the processing, at the same time delaying the checkpoint barrier.

Maybe it would make sense to try enabling the latency markers[1] and check if some of the operators in the first task add significant latencies. Just as a side note the backpressure and latency topics are very well described in this blog post[2].

Best,

Dawid


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking

[2] https://flink.apache.org/2019/07/23/flink-network-stack-2.html

On 24/06/2020 02:46, Kathula, Sandeep wrote:

Hi Dawid,

We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing. 

 

 

Thanks

Sandeep Kathula

 

 

From: Dawid Wysakowicz [hidden email]
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" [hidden email], [hidden email] [hidden email]
Cc: "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid

 

On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone Description
                          automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Kathula, Sandeep

Hi Dawid,

    Thanks for helping us. As per the doc you shared, I set metrics.latency.interval to 1000 and metrics.latency.granularity to subtask  within flink-conf.yaml. We are using Flink version 1.9.  But when I go to Flink UI and search for latency in metrics, I can’t find any. I also tried with via rest API to see if there any metrics containing latency but couldn’t find them. In configuration of the job manager tab within Flink UI, I can see that both the properties are set. Am I missing anything?

 

 

 

Thanks

Sandeep Kathula

 

 

 

From: Dawid Wysakowicz <[hidden email]>
Date: Wednesday, June 24, 2020 at 12:26 AM
To: "Kathula, Sandeep" <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

I see you have a lot of operators chained in a single task. In this scenario the backpressure monitoring will not be very helpful it just shows that the second task does not slow down the first one. It does not tell though if some of the operators in the first task take longer and slow down the processing, at the same time delaying the checkpoint barrier.

Maybe it would make sense to try enabling the latency markers[1] and check if some of the operators in the first task add significant latencies. Just as a side note the backpressure and latency topics are very well described in this blog post[2].

Best,

Dawid

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking

[2] https://flink.apache.org/2019/07/23/flink-network-stack-2.html

On 24/06/2020 02:46, Kathula, Sandeep wrote:

Hi Dawid,

We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing. 

 

 

Thanks

Sandeep Kathula

 

 

From: Dawid Wysakowicz [hidden email]
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" [hidden email], [hidden email] [hidden email]
Cc: "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid

 

On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone Description
                          automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 

Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Congxian Qiu
Hi Sandeep

Sorry for the late reply.

First, correct my previous words "barrier align", in the previous email, I want to say "the snapshot will be triggered after received all the barriers" -- so we can enable the debug log to track the barrier receive process.

If the stateful operator ack slowly, then I don't think you need to change settings of RocksDBStateBackend here, because we need to fix the slow barrier here.

I think latency markers Dawid mentioned can help here, but I'm not very familiar with it.

From my experience, I'll try to do the following things: 
1) whether there is any data skew (sort by bytes received).
2) find out is there some hot methods in the task(who send barrier slowly), maybe async-profile[1] can help you here. If "Enrich with Session" operator acks slowly, then you can try to find out is there any hot methods in "Souces" operator.


PS: I notice in the first email, there is "We are seeing a dip in processing during this time" maybe we need to find out this reason too (is the program will Interact with external services? if yes, what about the service state during this time).


Best,
Congxian


Kathula, Sandeep <[hidden email]> 于2020年6月25日周四 上午6:17写道:

Hi Dawid,

    Thanks for helping us. As per the doc you shared, I set metrics.latency.interval to 1000 and metrics.latency.granularity to subtask  within flink-conf.yaml. We are using Flink version 1.9.  But when I go to Flink UI and search for latency in metrics, I can’t find any. I also tried with via rest API to see if there any metrics containing latency but couldn’t find them. In configuration of the job manager tab within Flink UI, I can see that both the properties are set. Am I missing anything?

 

 

 

Thanks

Sandeep Kathula

 

 

 

From: Dawid Wysakowicz <[hidden email]>
Date: Wednesday, June 24, 2020 at 12:26 AM
To: "Kathula, Sandeep" <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

I see you have a lot of operators chained in a single task. In this scenario the backpressure monitoring will not be very helpful it just shows that the second task does not slow down the first one. It does not tell though if some of the operators in the first task take longer and slow down the processing, at the same time delaying the checkpoint barrier.

Maybe it would make sense to try enabling the latency markers[1] and check if some of the operators in the first task add significant latencies. Just as a side note the backpressure and latency topics are very well described in this blog post[2].

Best,

Dawid

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking

[2] https://flink.apache.org/2019/07/23/flink-network-stack-2.html

On 24/06/2020 02:46, Kathula, Sandeep wrote:

Hi Dawid,

We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing. 

 

 

Thanks

Sandeep Kathula

 

 

From: Dawid Wysakowicz [hidden email]
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" [hidden email], [hidden email] [hidden email]
Cc: "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid

 

On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a cell phone Description
                          automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 

Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Dawid Wysakowicz-2

Hi,

As for the latency markers, they will not be visible in the UI as they are not scoped to any operator. They are in a job scope. As far as I can tell there is no view in the UI for those kind of metrics. You will need to have them reported somewhere. If you do not have any metrics collector in place, you can try enabling the slf4j metrics reporter [1]. With this enabled you should see the metrics in log files.

BTW, I think suggestions from Congxian how you can track a slow operator are also valid.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 25/06/2020 04:43, Congxian Qiu wrote:
Hi Sandeep

Sorry for the late reply.

First, correct my previous words "barrier align", in the previous email, I want to say "the snapshot will be triggered after received all the barriers" -- so we can enable the debug log to track the barrier receive process.

If the stateful operator ack slowly, then I don't think you need to change settings of RocksDBStateBackend here, because we need to fix the slow barrier here.

I think latency markers Dawid mentioned can help here, but I'm not very familiar with it.

From my experience, I'll try to do the following things: 
1) whether there is any data skew (sort by bytes received).
2) find out is there some hot methods in the task(who send barrier slowly), maybe async-profile[1] can help you here. If "Enrich with Session" operator acks slowly, then you can try to find out is there any hot methods in "Souces" operator.


PS: I notice in the first email, there is "We are seeing a dip in processing during this time" maybe we need to find out this reason too (is the program will Interact with external services? if yes, what about the service state during this time).


Best,
Congxian


Kathula, Sandeep <[hidden email]> 于2020年6月25日周四 上午6:17写道:

Hi Dawid,

    Thanks for helping us. As per the doc you shared, I set metrics.latency.interval to 1000 and metrics.latency.granularity to subtask  within flink-conf.yaml. We are using Flink version 1.9.  But when I go to Flink UI and search for latency in metrics, I can’t find any. I also tried with via rest API to see if there any metrics containing latency but couldn’t find them. In configuration of the job manager tab within Flink UI, I can see that both the properties are set. Am I missing anything?

 

 

 

Thanks

Sandeep Kathula

 

 

 

From: Dawid Wysakowicz <[hidden email]>
Date: Wednesday, June 24, 2020 at 12:26 AM
To: "Kathula, Sandeep" <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

I see you have a lot of operators chained in a single task. In this scenario the backpressure monitoring will not be very helpful it just shows that the second task does not slow down the first one. It does not tell though if some of the operators in the first task take longer and slow down the processing, at the same time delaying the checkpoint barrier.

Maybe it would make sense to try enabling the latency markers[1] and check if some of the operators in the first task add significant latencies. Just as a side note the backpressure and latency topics are very well described in this blog post[2].

Best,

Dawid

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking

[2] https://flink.apache.org/2019/07/23/flink-network-stack-2.html

On 24/06/2020 02:46, Kathula, Sandeep wrote:

Hi Dawid,

We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing. 

 

 

Thanks

Sandeep Kathula

 

 

From: Dawid Wysakowicz [hidden email]
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" [hidden email], [hidden email] [hidden email]
Cc: "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid

 

On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a
                                  cell phone Description automatically
                                  generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Kathula, Sandeep

Hi Dawid,

    I enabled slf4j metrics but couldn’t see the latency metrics in either job manager or task manager. Infact I am not finding any histograms within metrics. I set metrics.latency.interval to 1000 and metrics.latency.granularity to subtask  within flink-conf.yaml.  Is there anything else we need to do?  Meanwhile we are also trying to attach profiler and see what’s going wrong.

 

Thanks

Sandeep Kathula

 

From: Dawid Wysakowicz <[hidden email]>
Date: Thursday, June 25, 2020 at 1:42 AM
To: Congxian Qiu <[hidden email]>, "Kathula, Sandeep" <[hidden email]>
Cc: "[hidden email]" <[hidden email]>, "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi,

As for the latency markers, they will not be visible in the UI as they are not scoped to any operator. They are in a job scope. As far as I can tell there is no view in the UI for those kind of metrics. You will need to have them reported somewhere. If you do not have any metrics collector in place, you can try enabling the slf4j metrics reporter [1]. With this enabled you should see the metrics in log files.

BTW, I think suggestions from Congxian how you can track a slow operator are also valid.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 25/06/2020 04:43, Congxian Qiu wrote:

Hi Sandeep

 

Sorry for the late reply.

 

First, correct my previous words "barrier align", in the previous email, I want to say "the snapshot will be triggered after received all the barriers" -- so we can enable the debug log to track the barrier receive process.

 

If the stateful operator ack slowly, then I don't think you need to change settings of RocksDBStateBackend here, because we need to fix the slow barrier here.

 

I think latency markers Dawid mentioned can help here, but I'm not very familiar with it.

 

From my experience, I'll try to do the following things: 

1) whether there is any data skew (sort by bytes received).

2) find out is there some hot methods in the task(who send barrier slowly), maybe async-profile[1] can help you here. If "Enrich with Session" operator acks slowly, then you can try to find out is there any hot methods in "Souces" operator.

 

 

PS: I notice in the first email, there is "We are seeing a dip in processing during this time" maybe we need to find out this reason too (is the program will Interact with external services? if yes, what about the service state during this time).

 


Best,

Congxian

 

 

Kathula, Sandeep <[hidden email]> 2020625日周四 上午6:17写道:

Hi Dawid,

    Thanks for helping us. As per the doc you shared, I set metrics.latency.interval to 1000 and metrics.latency.granularity to subtask  within flink-conf.yaml. We are using Flink version 1.9.  But when I go to Flink UI and search for latency in metrics, I can’t find any. I also tried with via rest API to see if there any metrics containing latency but couldn’t find them. In configuration of the job manager tab within Flink UI, I can see that both the properties are set. Am I missing anything?

 

 

 

Thanks

Sandeep Kathula

 

 

 

From: Dawid Wysakowicz <[hidden email]>
Date: Wednesday, June 24, 2020 at 12:26 AM
To: "Kathula, Sandeep" <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

I see you have a lot of operators chained in a single task. In this scenario the backpressure monitoring will not be very helpful it just shows that the second task does not slow down the first one. It does not tell though if some of the operators in the first task take longer and slow down the processing, at the same time delaying the checkpoint barrier.

Maybe it would make sense to try enabling the latency markers[1] and check if some of the operators in the first task add significant latencies. Just as a side note the backpressure and latency topics are very well described in this blog post[2].

Best,

Dawid

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking

[2] https://flink.apache.org/2019/07/23/flink-network-stack-2.html

On 24/06/2020 02:46, Kathula, Sandeep wrote:

Hi Dawid,

We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing. 

 

 

Thanks

Sandeep Kathula

 

 

From: Dawid Wysakowicz [hidden email]
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" [hidden email], [hidden email] [hidden email]
Cc: "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid

 

On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a
                                  cell phone Description automatically
                                  generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 

Reply | Threaded
Open this post in threaded view
|

Re: S3 Checkpointing taking long time with stateful operations

Dawid Wysakowicz-2

Did you move the reporter's jar from /opt to /lib [1]? Can you look for the reporter's class name in the logs. If I am not mistaken there should be an entry in case the reporter could have been initialized or not.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#reporter

On 26/06/2020 04:19, Kathula, Sandeep wrote:

Hi Dawid,

    I enabled slf4j metrics but couldn’t see the latency metrics in either job manager or task manager. Infact I am not finding any histograms within metrics. I set metrics.latency.interval to 1000 and metrics.latency.granularity to subtask  within flink-conf.yaml.  Is there anything else we need to do?  Meanwhile we are also trying to attach profiler and see what’s going wrong.

 

Thanks

Sandeep Kathula

 

From: Dawid Wysakowicz [hidden email]
Date: Thursday, June 25, 2020 at 1:42 AM
To: Congxian Qiu [hidden email], "Kathula, Sandeep" [hidden email]
Cc: [hidden email] [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi,

As for the latency markers, they will not be visible in the UI as they are not scoped to any operator. They are in a job scope. As far as I can tell there is no view in the UI for those kind of metrics. You will need to have them reported somewhere. If you do not have any metrics collector in place, you can try enabling the slf4j metrics reporter [1]. With this enabled you should see the metrics in log files.

BTW, I think suggestions from Congxian how you can track a slow operator are also valid.

Best,

Dawid

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter

On 25/06/2020 04:43, Congxian Qiu wrote:

Hi Sandeep

 

Sorry for the late reply.

 

First, correct my previous words "barrier align", in the previous email, I want to say "the snapshot will be triggered after received all the barriers" -- so we can enable the debug log to track the barrier receive process.

 

If the stateful operator ack slowly, then I don't think you need to change settings of RocksDBStateBackend here, because we need to fix the slow barrier here.

 

I think latency markers Dawid mentioned can help here, but I'm not very familiar with it.

 

From my experience, I'll try to do the following things: 

1) whether there is any data skew (sort by bytes received).

2) find out is there some hot methods in the task(who send barrier slowly), maybe async-profile[1] can help you here. If "Enrich with Session" operator acks slowly, then you can try to find out is there any hot methods in "Souces" operator.

 

 

PS: I notice in the first email, there is "We are seeing a dip in processing during this time" maybe we need to find out this reason too (is the program will Interact with external services? if yes, what about the service state during this time).

 


Best,

Congxian

 

 

Kathula, Sandeep <[hidden email]> 2020625日周四 上午6:17写道:

Hi Dawid,

    Thanks for helping us. As per the doc you shared, I set metrics.latency.interval to 1000 and metrics.latency.granularity to subtask  within flink-conf.yaml. We are using Flink version 1.9.  But when I go to Flink UI and search for latency in metrics, I can’t find any. I also tried with via rest API to see if there any metrics containing latency but couldn’t find them. In configuration of the job manager tab within Flink UI, I can see that both the properties are set. Am I missing anything?

 

 

 

Thanks

Sandeep Kathula

 

 

 

From: Dawid Wysakowicz <[hidden email]>
Date: Wednesday, June 24, 2020 at 12:26 AM
To: "Kathula, Sandeep" <[hidden email]>, "[hidden email]" <[hidden email]>
Cc: "Vora, Jainik" <[hidden email]>, "Rosensweig, JD" <[hidden email]>
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

I see you have a lot of operators chained in a single task. In this scenario the backpressure monitoring will not be very helpful it just shows that the second task does not slow down the first one. It does not tell though if some of the operators in the first task take longer and slow down the processing, at the same time delaying the checkpoint barrier.

Maybe it would make sense to try enabling the latency markers[1] and check if some of the operators in the first task add significant latencies. Just as a side note the backpressure and latency topics are very well described in this blog post[2].

Best,

Dawid

 

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/monitoring/metrics.html#latency-tracking

[2] https://flink.apache.org/2019/07/23/flink-network-stack-2.html

On 24/06/2020 02:46, Kathula, Sandeep wrote:

Hi Dawid,

We saw that the backpressure is almost 0 for all our operators. But still we see lag increasing when reading from kafka topics. When I take a savepoint and restart from savepoint without checkpointing, I can see that lag is reducing. So we think that there must be some problem with the checkpointing as it is taking around 2 minutes and we are seeing dip In processing during checkpointing. 

 

 

Thanks

Sandeep Kathula

 

 

From: Dawid Wysakowicz [hidden email]
Date: Monday, June 22, 2020 at 11:17 PM
To: "Kathula, Sandeep" [hidden email], [hidden email] [hidden email]
Cc: "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

Hi Sandeep,

I am not sure if you received the message from Yun Tang. I think he made a good point there. The problem might be that the operators take too much time processing regular records which delays the checkpoint barriers processing. If that's the case you might want to try increasing the parallelism for the slow operators or revisit your processing logic.

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

[1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html#back-pressure

Best,

Dawid

 

On 22/06/2020 20:21, Kathula, Sandeep wrote:

Hi Congxian,

Thanks for the reply. I enabled debug logs and I see that it took more than a minute to get barriers for a given checkpoint from all the task slots.

We are reading from multiple kafka input topics. Is this the reason for this behavior?  Or, do I need to change any settings related to RocksDB (we are mainly observing this behavior with stateful operator which does incremental state store to RocksDB)?

We have 10 task managers each with 2 task slots, 4 CPU, 20GB memory out of which 12GB is heap memory. Parallelism is 20.

 

Please find the logs for barriers attached.

 

Any inputs on how to solve this? 

 

 

Thanks

Sandeep Kathula

From: Congxian Qiu [hidden email]
Date: Saturday, June 20, 2020 at 7:19 PM
To: [hidden email] [hidden email]
Cc: "Kathula, Sandeep" [hidden email], "Vora, Jainik" [hidden email], "Rosensweig, JD" [hidden email]
Subject: Re: S3 Checkpointing taking long time with stateful operations

 

This email is from an external sender.

 

Hi

 

From the description and the given figure. the e2e time for one task is longer than $time{sync-snapshot} + $time{async-snapshot}. 

For at least once mode, could you please try to enable the debug log to track the barrier align process?

you can find the debug log such as

"Received barrier for checkpoint {} from channel {}"

"Received cancellation barrier for checkpoint {} "

"Received all barriers for checkpoint {}"

 

Best,

Congxian

 

 

Yun Tang <[hidden email]> 2020619日周五 上午11:48写道:

Hi Sandeep

 

At-least-once checkpoint mode would not need to align barrier and the longer end-to-end duration is mainly due to barrier cannot be processed by operator as soon as possible.

Operator will only start checkpoint after processed checkpoint barrier, I think you might need to check the back-pressure status of your job[1].

Back-pressure would make the checkpoint barrier move to downstream more slowly in the network channels.

 

 

Best

Yun Tang


From: Kathula, Sandeep <[hidden email]>
Sent: Friday, June 19, 2020 9:19
To: [hidden email] <[hidden email]>
Cc: Vora, Jainik <[hidden email]>; Rosensweig, JD <[hidden email]>
Subject: S3 Checkpointing taking long time with stateful operations

 

Hi,

We are running a stateful application in Flink with RocksDB as backend and set incremental state to true with checkpoints written to S3

  • 10 task managers each with 2 task slots
  • Checkpoint interval 3 minutes
  • Checkpointing mode – At-least once processing

 

After running app for 2-3 days, we are seeing end to end checkpoint takes almost 2 minutes with Sync time 2 sec and async time 15 sec max. But initially when state is less, it takes 10-15 sec for checkpointing. As checkpointing mode is at least once, align duration is 0. We are seeing a dip in processing during this time. Couldn’t find out what the actual issue is. 

 

We also tried with remote HDFS for checkpointing but observed similar behavior. 

 

We have couple of questions:

  • When sync time is max 2 sec and async time is 15 sec why is end to end checkpointing taking almost 2 minutes?
  • How can we reduce the checkpoint time?

A screenshot of a
                                      cell phone Description
                                      automatically generated

 

Any help would be appreciated.

 

 

Thank you

Sandeep Kathula

 

 


signature.asc (849 bytes) Download Attachment