When we start a high-parallelism (1,600) job without any checkpoint/savepoint, the job struggled to be deployed. After a few restarts, it eventually got deployed and was running fine after the initial struggle. jobmanager was very busy. Web UI was very slow. I saw these two exceptions/failures during the initial failures. I don't seem to see this issue when starting the same job from an external checkpoint. or at least very rarely. Anyone else experienced similar issue? Thanks, Steven Exception #1 java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id fe55bf158e89cf555be6582e577b9621 timed out. at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1624) at org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Exception #2 org.apache.flink.runtime.io.network.partition.PartitionNotFoundException: Partition 3cdc03de22920b0dc1ef3d82e06b7d75@7fda7294a52a6da2d831d832223d3627 not found. at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:77) at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:111) at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:155) at java.util.TimerThread.mainLoop(Timer.java:555) at java.util.TimerThread.run(Timer.java:505) |
Hi Steven, Did you not experience this problem with previous Flink release (your marked topic with 1.7)? Do you use HA setup? Without HA setup, the blob data, which belongs to the job, will be distributed from job master node to all task executors. Depending on the size of the blob data (jars, user serialised classes etc), it might overwhelm job master node and network connections. It can subsequently slow down UI, heart-beating and initialisation of task executors and produced partitions because task executors contend for the blob data. When the job is restored, the blob data might be not fetched because it is already available. With HA setup, you can configure high-availability.storageDir in DFS and DFS will serve the blob data. Otherwise, could you provide the JM log for the further investigation? Best, Andrey On Thu, Jan 24, 2019 at 7:06 AM Steven Wu <[hidden email]> wrote:
|
In reply to this post by Steven Wu
Hi Andrey, Weird that I didn't see your reply in my email inbox. My colleague happened to see it in apache archive :) nope, we didn't experience it with 1.4 (previous version) Yes, we did use HA setup. high-availability: zookeeper Thanks, Steven ============================ Hi Steven, Did you not experience this problem with previous Flink release (your marked topic with 1.7)? Do you use HA setup? Without HA setup, the blob data, which belongs to the job, will be distributed from job master node to all task executors. Depending on the size of the blob data (jars, user serialised classes etc), it might overwhelm job master node and network connections. It can subsequently slow down UI, heart-beating and initialisation of task executors and produced partitions because task executors contend for the blob data. When the job is restored, the blob data might be not fetched because it is already available. With HA setup, you can configure high-availability.storageDir in DFS and DFS will serve the blob data. Otherwise, could you provide the JM log for the further investigation? Best, Andrey On Wed, Jan 23, 2019 at 10:06 PM Steven Wu <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |