Flink weird checkpointing behaviour

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink weird checkpointing behaviour

Pawel Bartoszek
Hi,

We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have noticed that some checkpoints are taking a very long time to complete some of them event fails with exception
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].

We have noticed that Checkpoint Duration (Async) is taking most of checkpoint time compared to Checkpoint Duration (Sync). I thought that Async checkpoints are only offered by RocksDB backend state. We use filesystem state.

We didn't have such problems on Flink 1.3.2

Thanks,
Pawel

Flink configuration
 
akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs /mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://....
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true
Reply | Threaded
Open this post in threaded view
|

Re: Flink weird checkpointing behaviour

Yun Tang
Hi Pawel

First of all, I don't think the akka timeout exception has relationship with checkpoint taking long time. And both RocksDBStateBackend and FsStateBackend could have the async part of checkpoint, which would upload data to DFS in general. That's why async part would take more time than sync part of checkpoint in most cases.

You could try to notice whether the checkpoint alignment time is much longer than before, back pressure of a job would cause tasks in downstream received checkpoint barrier later and tasks must receive all barriers from its inputs to trigger checkpoint [1]. If the long checkpoint alignment time mainly impact the overall checkpoint duration, you should check the tasks which cause back pressure.

Also, the long time of checkpoint might also be caused by the low write performance of DFS.

Best
Yun Tang


From: Pawel Bartoszek <[hidden email]>
Sent: Wednesday, October 24, 2018 23:11
To: User
Subject: Flink weird checkpointing behaviour
 
Hi,

We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have noticed that some checkpoints are taking a very long time to complete some of them event fails with exception
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].

We have noticed that Checkpoint Duration (Async) is taking most of checkpoint time compared to Checkpoint Duration (Sync). I thought that Async checkpoints are only offered by RocksDB backend state. We use filesystem state.

We didn't have such problems on Flink 1.3.2

Thanks,
Pawel

Flink configuration
 
akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs /mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://....
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true
Reply | Threaded
Open this post in threaded view
|

Re: Flink weird checkpointing behaviour

Dawid Wysakowicz-2

Hi,

I think it is definitely worth checking the alignment time as Yun Tang suggested. There were some changes in the network stack that could influence this behavior between those version.


I've also added Stefan as cc, who might have more ideas what would be worth checking.

Best,

Dawid


On 31/10/2018 16:51, Yun Tang wrote:
Hi Pawel

First of all, I don't think the akka timeout exception has relationship with checkpoint taking long time. And both RocksDBStateBackend and FsStateBackend could have the async part of checkpoint, which would upload data to DFS in general. That's why async part would take more time than sync part of checkpoint in most cases.

You could try to notice whether the checkpoint alignment time is much longer than before, back pressure of a job would cause tasks in downstream received checkpoint barrier later and tasks must receive all barriers from its inputs to trigger checkpoint [1]. If the long checkpoint alignment time mainly impact the overall checkpoint duration, you should check the tasks which cause back pressure.

Also, the long time of checkpoint might also be caused by the low write performance of DFS.

Best
Yun Tang


From: Pawel Bartoszek [hidden email]
Sent: Wednesday, October 24, 2018 23:11
To: User
Subject: Flink weird checkpointing behaviour
 
Hi,

We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have noticed that some checkpoints are taking a very long time to complete some of them event fails with exception
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].

We have noticed that Checkpoint Duration (Async) is taking most of checkpoint time compared to Checkpoint Duration (Sync). I thought that Async checkpoints are only offered by RocksDB backend state. We use filesystem state.

We didn't have such problems on Flink 1.3.2

Thanks,
Pawel

Flink configuration
 
akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs /mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://....
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true

signature.asc (849 bytes) Download Attachment