[Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

Steven Wu
When we start a high-parallelism (1,600) job without any checkpoint/savepoint, the job struggled to be deployed. After a few restarts, it eventually got deployed and was running fine after the initial struggle. jobmanager was very busy. Web UI was very slow. I saw these two exceptions/failures during the initial failures.

I don't seem to see this issue when starting the same job from an external checkpoint. or at least very rarely. 

Anyone else experienced similar issue?

Thanks,
Steven

Exception #1

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id fe55bf158e89cf555be6582e577b9621 timed out.

at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)

at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Exception #2

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627 not found.

at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

at java.util.TimerThread.mainLoop(Timer.java:555)

at java.util.TimerThread.run(Timer.java:505)


Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

Andrey Zagrebin-2
Hi Steven,

Did you not experience this problem with previous Flink release (your marked topic with 1.7)?

Do you use HA setup? 

Without HA setup, the blob data, which belongs to the job, will be distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc), it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task executors and produced partitions because task executors contend for the blob data. When the job is restored, the blob data might be not fetched because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Thu, Jan 24, 2019 at 7:06 AM Steven Wu <[hidden email]> wrote:
When we start a high-parallelism (1,600) job without any checkpoint/savepoint, the job struggled to be deployed. After a few restarts, it eventually got deployed and was running fine after the initial struggle. jobmanager was very busy. Web UI was very slow. I saw these two exceptions/failures during the initial failures.

I don't seem to see this issue when starting the same job from an external checkpoint. or at least very rarely. 

Anyone else experienced similar issue?

Thanks,
Steven

Exception #1

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id fe55bf158e89cf555be6582e577b9621 timed out.

at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)

at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Exception #2

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627 not found.

at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

at java.util.TimerThread.mainLoop(Timer.java:555)

at java.util.TimerThread.run(Timer.java:505)


Reply | Threaded
Open this post in threaded view
|

Re: [Flink 1.7.0] initial failures with starting high-parallelism job without checkpoint/savepoint

Steven Wu
In reply to this post by Steven Wu
Hi Andrey,

Weird that I didn't see your reply in my email inbox. My colleague happened to see it in apache archive :)

nope, we didn't experience it with 1.4 (previous version)

Yes, we did use HA setup.
high-availability: zookeeper
high-availability.zookeeper.quorum: ...
high-availability.zookeeper.path.root: ...
high-availability.zookeeper.path.latch: /leaderlatch
high-availability.zookeeper.path.leader: /leader
high-availability.zookeeper.path.jobgraphs: /jobgraphs
high-availability.zookeeper.path.checkpoints: /checkpoints
recovery.zookeeper.path.checkpoint-counter: /checkpoint-counter
high-availability.storageDir: ...

My colleague (Mark Cho) will provide some additional observations.

Thanks,
Steven

============================
Hi Steven,

Did you not experience this problem with previous Flink release (your marked topic with 1.7)?

Do you use HA setup? 

Without HA setup, the blob data, which belongs to the job, will be distributed from job master node to all task executors.
Depending on the size of the blob data (jars, user serialised classes etc), it might overwhelm job master node and network connections.
It can subsequently slow down UI, heart-beating and initialisation of task executors and produced partitions because task executors contend for the blob data. When the job is restored, the blob data might be not fetched because it is already available.

With HA setup, you can configure high-availability.storageDir in DFS and DFS will serve the blob data.

Otherwise, could you provide the JM log for the further investigation?

Best,
Andrey

On Wed, Jan 23, 2019 at 10:06 PM Steven Wu <[hidden email]> wrote:
When we start a high-parallelism (1,600) job without any checkpoint/savepoint, the job struggled to be deployed. After a few restarts, it eventually got deployed and was running fine after the initial struggle. jobmanager was very busy. Web UI was very slow. I saw these two exceptions/failures during the initial failures.

I don't seem to see this issue when starting the same job from an external checkpoint. or at least very rarely. 

Anyone else experienced similar issue?

Thanks,
Steven

Exception #1

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id fe55bf158e89cf555be6582e577b9621 timed out.

at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624)

at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



Exception #2

org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627 not found.

at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77)

at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111)

at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155)

at java.util.TimerThread.mainLoop(Timer.java:555)

at java.util.TimerThread.run(Timer.java:505)