Failed to recover jobs after jobmanager leader switches

classic Classic list List threaded Threaded
1 message Options
Yan
Reply | Threaded
Open this post in threaded view
|

Failed to recover jobs after jobmanager leader switches

Yan
This post was updated on .
The job one our Flink cluster goes down frequently recently. We have two jobmanagers. According to the log, it seems whenever a new job manager is elected, Flink fails to recover the job that previously runs. We are using flink-1.0.3. Does anybody know if this is a bug of Flink? Thanks
***************************************************
Log:

2017-03-08 10:58:26,995 INFO  org.apache.zookeeper.ClientCnxn                               - Socket connection established to zk4.prod.crawler.eniro/172.27.164.33:2181, initiating session
2017-03-08 10:58:26,999 INFO  org.apache.zookeeper.ClientCnxn                               - Session establishment complete on server zk4.prod.crawler.eniro/172.27.164.33:2181, sessionid = 0x45aa9fe58210001, negotiated timeout = 40000
2017-03-08 10:58:26,999 INFO  org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
2017-03-08 10:58:28,026 INFO  org.apache.flink.runtime.checkpoint.SavepointStoreFactory     - Using job manager savepoint state backend.
2017-03-08 10:58:28,033 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Starting JobManager at akka.tcp://flink@172.27.163.235:41745/user/jobmanager.
2017-03-08 10:58:28,033 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Starting ZooKeeperLeaderElectionService.
2017-03-08 10:58:28,035 INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist           - Started memory archivist akka://flink/user/archive
2017-03-08 10:58:28,042 INFO  org.apache.flink.runtime.webmonitor.WebRuntimeMonitor         - Starting with JobManager akka.tcp://flink@172.27.163.235:41745/user/jobmanager on port 8081
2017-03-08 10:58:28,042 INFO  org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting ZooKeeperLeaderRetrievalService.
2017-03-08 10:58:28,059 INFO  org.apache.flink.runtime.webmonitor.JobManagerRetriever       - New leader reachable under akka.tcp://flink@172.27.163.227:51695/user/jobmanager:226ccebc-2e70-4885-9251-e998f5a17fab.
2017-03-09 12:56:27,594 INFO  org.apache.flink.runtime.jobmanager.JobManager                - JobManager akka.tcp://flink@172.27.163.235:41745/user/jobmanager was granted leadership with leader session ID Some(c7a0ea7d-615d-49db-8ed7-e0fbfe6db84d).
2017-03-09 12:56:27,608 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Delaying recovery of all jobs by 10000 milliseconds.
2017-03-09 12:56:27,610 INFO  org.apache.flink.runtime.webmonitor.JobManagerRetriever       - New leader reachable under akka.tcp://flink@172.27.163.235:41745/user/jobmanager:c7a0ea7d-615d-49db-8ed7-e0fbfe6db84d.
2017-03-09 12:56:28,700 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app85 (akka.tcp://flink@172.27.165.62:36947/user/taskmanager) as 2ed17350a36c6263eca36b8826bf33a2. Current number of registered hosts is 1. Current number of alive task slots is 4.
2017-03-09 12:56:28,749 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app27 (akka.tcp://flink@172.27.164.5:50417/user/taskmanager) as ef1bf43a08af2568ab8357244bf3a593. Current number of registered hosts is 2. Current number of alive task slots is 8.
2017-03-09 12:56:28,863 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app83 (akka.tcp://flink@172.27.165.58:54983/user/taskmanager) as 83da4c68f0ccd31f333273f4952c2264. Current number of registered hosts is 3. Current number of alive task slots is 12.
2017-03-09 12:56:28,871 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app84 (akka.tcp://flink@172.27.165.60:47594/user/taskmanager) as 18e1d4c1c48641deb3a6b906d4f57dd3. Current number of registered hosts is 4. Current number of alive task slots is 16.
2017-03-09 12:56:29,003 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app25 (akka.tcp://flink@172.27.163.241:36720/user/taskmanager) as a6e2e9a04ab54b904505082d9c1ab91d. Current number of registered hosts is 5. Current number of alive task slots is 20.
2017-03-09 12:56:29,015 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app87 (akka.tcp://flink@172.27.165.66:37593/user/taskmanager) as f1efe6c42b134c949e560d158383f67f. Current number of registered hosts is 6. Current number of alive task slots is 24.
2017-03-09 12:56:29,305 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app26 (akka.tcp://flink@172.27.163.245:36015/user/taskmanager) as 41e36e8a11ae2ce6f9c179980712a0f9. Current number of registered hosts is 7. Current number of alive task slots is 28.
2017-03-09 12:56:29,424 INFO  org.apache.flink.runtime.instance.InstanceManager             - Registered TaskManager at app86 (akka.tcp://flink@172.27.165.64:38634/user/taskmanager) as fe87705cc642577ca250334db2b1cc54. Current number of registered hosts is 8. Current number of alive task slots is 32.
2017-03-09 12:56:37,627 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Attempting to recover all jobs.
2017-03-09 12:56:37,694 ERROR org.apache.flink.runtime.jobmanager.JobManager                - Fatal error: Failed to recover jobs.
java.io.FileNotFoundException: /apps/flink/recovery/submittedJobGraphf771daf77330 (No such file or directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:52)
        at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:143)
        at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:51)
        at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35)
        at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:433)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:429)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:429)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:429)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:425)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:425)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
*************************************************************

The jobs found on our 2 jobmanagers at /apps/flink-1.0.3/recovery/ are:
On 172.27.163.227:
drwxr-xr-x 3 deploycrawler eniro   4096 May 18  2016 blob
-rw-r--r-- 1 deploycrawler eniro  50799 Jul  1  2016 submittedJobGraph0089f7ab6a51
-rw-r--r-- 1 deploycrawler eniro 281895 Mar  9 16:03 submittedJobGraph4a209890a35d
-rw-r--r-- 1 deploycrawler eniro 281894 Mar  8 11:15 submittedJobGraph5b24d24191d0
-rw-r--r-- 1 deploycrawler eniro 280165 Mar  1 15:33 submittedJobGraphf771daf77330

On 172.27.163.235:
drwxr-xr-x 3 deploycrawler eniro   4096 Jul  1  2016 blob
-rw-r--r-- 1 deploycrawler eniro  50791 Jul  1  2016 submittedJobGraphb6357063f81b
-rw-r--r-- 1 deploycrawler eniro 281011 Sep 13 16:12 submittedJobGraphc80bdf54209a

It seems they it looks up the recovery job on the wrong server. But some jobs seems to be very old. I am confused when flink save jobs here.