Hi, We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have noticed that some checkpoints are taking a very long time to complete some of them event fails with exception Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms]. We have noticed that Checkpoint Duration (Async) is taking most of checkpoint time compared to Checkpoint Duration (Sync). I thought that Async checkpoints are only offered by RocksDB backend state. We use filesystem state. We didn't have such problems on Flink 1.3.2 Thanks, Pawel Flink configuration akka.ask.timeout 60 s classloader.resolve-order parent-first containerized.heap-cutoff-ratio 0.15 env.hadoop.conf.dir /etc/hadoop/conf env.yarn.conf.dir /etc/hadoop/conf high-availability zookeeper high-availability.cluster-id application_1540292869184_0001 high-availability.zookeeper.path.root /flink high-availability.zookeeper.quorum ip-10-4-X-X.eu-west-1.compute.internal:2181 high-availability.zookeeper.storageDir hdfs:///flink/recovery internal.cluster.execution-mode NORMAL internal.io.tmpdirs.use-local-default true io.tmp.dirs /mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001 jobmanager.heap.mb 3072 jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal jobmanager.rpc.port 41219 jobmanager.web.checkpoints.history 1000 parallelism.default 32 rest.address ip-10-4-X-X.eu-west-1.compute.internal rest.port 0 state.backend filesystem state.backend.fs.checkpointdir s3a://.... state.checkpoints.dir s3a://... state.savepoints.dir s3a://... taskmanager.heap.mb 6600 taskmanager.numberOfTaskSlots 1 web.port 0 web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199 yarn.application-attempts 10 yarn.maximum-failed-containers -1 zookeeper.sasl.disable true |
Hi Pawel
First of all, I don't think the akka timeout exception has relationship with checkpoint taking long time. And both RocksDBStateBackend and FsStateBackend could have the async part of checkpoint, which would upload data to DFS in general. That's why async part
would take more time than sync part of checkpoint in most cases.
You could try to notice whether the checkpoint alignment time is much longer than before, back pressure of a job would cause tasks in downstream received checkpoint barrier later and tasks must receive all barriers from its inputs to trigger checkpoint [1].
If the long checkpoint alignment time mainly impact the overall checkpoint duration, you should check the tasks which cause back pressure.
Also, the long time of checkpoint might also be caused by the low write performance of DFS.
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/internals/stream_checkpointing.html#barriers
Best
Yun Tang
From: Pawel Bartoszek <[hidden email]>
Sent: Wednesday, October 24, 2018 23:11 To: User Subject: Flink weird checkpointing behaviour Hi,
We have just upgraded to Flink 1.5.2 on EMR from Flink 1.3.2. We have noticed that some checkpoints are taking a very long time to complete some of them event fails with exception
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/jobmanager_0#-665361795]] after [60000 ms].
We have noticed that Checkpoint Duration (Async) is taking most of checkpoint time compared to Checkpoint Duration (Sync).
I thought that Async checkpoints are only offered by RocksDB backend state. We use filesystem state.
We didn't have such problems on Flink 1.3.2
Thanks,
Pawel
Flink configuration
akka.ask.timeout 60 s
classloader.resolve-order parent-first
containerized.heap-cutoff-ratio 0.15
env.hadoop.conf.dir /etc/hadoop/conf
env.yarn.conf.dir /etc/hadoop/conf
high-availability zookeeper
high-availability.cluster-id application_1540292869184_0001
high-availability.zookeeper.path.root /flink
high-availability.zookeeper.quorum ip-10-4-X-X.eu-west-1.compute.internal:2181
high-availability.zookeeper.storageDir
hdfs:///flink/recovery
internal.cluster.execution-mode NORMAL
internal.io.tmpdirs.use-local-default true
io.tmp.dirs /mnt/yarn/usercache/hadoop/appcache/application_1540292869184_0001
jobmanager.heap.mb 3072
jobmanager.rpc.address ip-10-4-X-X.eu-west-1.compute.internal
jobmanager.rpc.port 41219
jobmanager.web.checkpoints.history 1000
parallelism.default 32
rest.address ip-10-4-X-X.eu-west-1.compute.internal
rest.port 0
state.backend filesystem
state.backend.fs.checkpointdir s3a://....
state.checkpoints.dir s3a://...
state.savepoints.dir s3a://...
taskmanager.heap.mb 6600
taskmanager.numberOfTaskSlots 1
web.port 0
web.tmpdir /tmp/flink-web-c3d16e22-1a33-46a2-9825-a6e268892199
yarn.application-attempts 10
yarn.maximum-failed-containers -1
zookeeper.sasl.disable true
|
Hi, I think it is definitely worth checking the alignment time as Yun Tang suggested. There were some changes in the network stack that could influence this behavior between those version.
I've also added Stefan as cc, who might have more ideas what would be worth checking. Best, Dawid
On 31/10/2018 16:51, Yun Tang wrote:
signature.asc (849 bytes) Download Attachment |
Free forum by Nabble | Edit this page |