Hi,
I am relatively new to Flink and I was experimenting with the save points feature. I have an HA cluster running with 1 Master and 4 Workers. The flink-config.yaml is as follows :
#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net
# The port where the JobManager's main actor system listens for messages.
jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM
jobmanager.heap.mb: 512
# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 8
# Specify whether TaskManager memory should be allocated when starting up (true) or when
# memory is required in the memory manager (false)
taskmanager.memory.preallocate: false
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
env.java.home: /usr/local/java
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s
#==============================================================================
# Web Frontend
#==============================================================================
# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.
jobmanager.web.port: 8081
# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.
#jobmanager.web.submit.enable: false
#==============================================================================
# Streaming state checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory>
#
state.backend: filesystem
# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
state.backend.fs.checkpointdir: file:///home/amallem/
state.savepoints.dir: file:///home/amallem/save/
#==============================================================================
# Master High Availability (required configuration)
#==============================================================================
# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
recovery.mode: zookeeper
#
recovery.zookeeper.quorum: stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
recovery.zookeeper.storageDir: file:///home/amallem/recovery
recovery.zookeeper.path.root: /flink
recovery.zookeeper.path.namespace: /cluster_one
Query : I am able to take a save point and then cancel my current job but when I try to start the job again using the save point I get the error stating “ProgramInvocationException: JobManager did not respond within
60000ms”. I checked Zookeeper and everything seems fine. I also followed the recommendations in the following post : http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
Is there any configuration I am missing to enable restarting of the job. Any help will be appreciated. Thanks.
Regards,
Anirudh
|
Hi,
The issue seems to be connected with trying to restart the job in the detached mode. The stack trace is as follows:
-bash-3.2$ bin/flink run -d -s jobmanager://savepoints/1 -c com.tfs.rtdp.precompute.Flink.FlinkTest /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar file:///home/amallem/capitalone.properties
Cluster configuration: Standalone cluster with JobManager at /10.64.119.90:33167
Using address 10.64.119.90:33167 to connect to JobManager.
JobManager web interface address http://10.64.119.90:8081
Starting execution of program
Submitting Job with JobID: 6c5596772627d3c9366deaa0c47ab0ad. Returning after job submission.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: JobManager did not respond within 60000 milliseconds
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:432)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:93)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
at org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:323)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager did not respond within 60000 milliseconds
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:227)
at org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:429)
... 8 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [60000 milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at scala.concurrent.Await.result(package.scala)
at org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:223)
... 9 more
When running it without the detached mode, the job manager is responding but the job is failing as it thinks that the structure of the job is modified.
-bash-3.2$ bin/flink run -s jobmanager://savepoints/1 -c com.tfs.rtdp.precompute.Flink.FlinkTest /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar file:///home/amallem/capitalone.properties
Cluster configuration: Standalone cluster with JobManager at /10.64.119.90:33167
Using address 10.64.119.90:33167 to connect to JobManager.
JobManager web interface address http://10.64.119.90:8081
Starting execution of program
Submitting job with JobID: fe7bf69676a5127eecb392e3a0743c6d. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@10.64.119.90:33167/user/jobmanager#-2047134036]
10/18/2016 14:56:11
Job execution switched to status FAILING.
org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint jobmanager://savepoints/1. Cannot map old state for task 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the program has been changed in a non-compatible
way after the savepoint.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
... 10 more
10/18/2016 14:56:11
Source: Custom Source(1/1) switched to CANCELED
10/18/2016 14:56:11
Map(1/1) switched to CANCELED
10/18/2016 14:56:11
Filter(1/1) switched to CANCELED
10/18/2016 14:56:11
Map(1/1) switched to CANCELED
10/18/2016 14:56:11
Map(1/1) switched to CANCELED
10/18/2016 14:56:11
Map(1/1) switched to CANCELED
10/18/2016 14:56:11
Sink: Unnamed(1/1) switched to CANCELED
10/18/2016 14:56:11
Job execution switched to status FAILED.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405)
at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585)
at com.tfs.rtdp.precompute.Flink.FlinkTest$.main(FlinkTest.scala:141)
at com.tfs.rtdp.precompute.Flink.FlinkTest.main(FlinkTest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320)
at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable failure. This suppresses job restarts. Please check the stack trace for the root cause.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
... 2 more
Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint jobmanager://savepoints/1. Cannot map old state for task 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the program has been changed in a non-compatible
way after the savepoint.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302)
... 10 more
Thanks.
From: Anirudh Mallem
Date: Tuesday, October 18, 2016 at 2:07 PM To: "[hidden email]" Subject: Issue while restarting from SavePoint
Hi,
I am relatively new to Flink and I was experimenting with the save points feature. I have an HA cluster running with 1 Master and 4 Workers. The flink-config.yaml is as follows :
#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: stable-stream-master01.app.shared.int.sv2.247-inc.net
# The port where the JobManager's main actor system listens for messages.
jobmanager.rpc.port: 6123
# The heap size for the JobManager JVM
jobmanager.heap.mb: 512
# The heap size for the TaskManager JVM
taskmanager.heap.mb: 2048
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 8
# Specify whether TaskManager memory should be allocated when starting up (true) or when
# memory is required in the memory manager (false)
taskmanager.memory.preallocate: false
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
env.java.home: /usr/local/java
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2
restart-strategy.fixed-delay.delay: 10 s
#==============================================================================
# Web Frontend
#==============================================================================
# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.
jobmanager.web.port: 8081
# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.
#jobmanager.web.submit.enable: false
#==============================================================================
# Streaming state checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends: jobmanager, filesystem, <class-name-of-factory>
#
state.backend: filesystem
# Directory for storing checkpoints in a Flink-supported filesystem
# Note: State backend must be accessible from the JobManager and all TaskManagers.
# Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
# (or any local file system under Windows), or "S3://" for S3 file system.
#
state.backend.fs.checkpointdir: file:///home/amallem/
state.savepoints.dir: file:///home/amallem/save/
#==============================================================================
# Master High Availability (required configuration)
#==============================================================================
# The list of ZooKepper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181)
#
recovery.mode: zookeeper
#
recovery.zookeeper.quorum: stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181
#
# Note: You need to set the state backend to 'filesystem' and the checkpoint
# directory (see above) before configuring the storageDir.
#
recovery.zookeeper.storageDir: file:///home/amallem/recovery
recovery.zookeeper.path.root: /flink
recovery.zookeeper.path.namespace: /cluster_one
Query : I am able to take a save point and then cancel my current job but when I try to start the job again using the save point I get the error stating “ProgramInvocationException: JobManager did not respond within
60000ms”. I checked Zookeeper and everything seems fine. I also followed the recommendations in the following post : http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli
Is there any configuration I am missing to enable restarting of the job. Any help will be appreciated. Thanks.
Regards,
Anirudh
|
Hey Anirudh!
As you say, this looks like two issues: (1) In detached mode the configuration seems to be not picked up correctly. That should be independent of the savepoints. Can you confirm this? (2) The program was changed in a non-compatible way after the savepoint. Did you change the program and if yes in which way? – Ufuk On Wed, Oct 19, 2016 at 12:01 AM, Anirudh Mallem <[hidden email]> wrote: > Hi, > The issue seems to be connected with trying to restart the job in the > detached mode. The stack trace is as follows: > > -bash-3.2$ bin/flink run -d -s jobmanager://savepoints/1 -c > com.tfs.rtdp.precompute.Flink.FlinkTest > /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar > file:///home/amallem/capitalone.properties > Cluster configuration: Standalone cluster with JobManager at > /10.64.119.90:33167 > Using address 10.64.119.90:33167 to connect to JobManager. > JobManager web interface address http://10.64.119.90:8081 > Starting execution of program > Submitting Job with JobID: 6c5596772627d3c9366deaa0c47ab0ad. Returning after > job submission. > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: JobManager did not respond within 60000 milliseconds > at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:432) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:93) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) > at > org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:323) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager > did not respond within 60000 milliseconds > at > org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:227) > at > org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:429) > ... 8 more > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [60000 milliseconds] > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at scala.concurrent.Await.result(package.scala) > at > org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:223) > ... 9 more > > When running it without the detached mode, the job manager is responding but > the job is failing as it thinks that the structure of the job is modified. > > -bash-3.2$ bin/flink run -s jobmanager://savepoints/1 -c > com.tfs.rtdp.precompute.Flink.FlinkTest > /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar > file:///home/amallem/capitalone.properties > Cluster configuration: Standalone cluster with JobManager at > /10.64.119.90:33167 > Using address 10.64.119.90:33167 to connect to JobManager. > JobManager web interface address http://10.64.119.90:8081 > Starting execution of program > Submitting job with JobID: fe7bf69676a5127eecb392e3a0743c6d. Waiting for job > completion. > Connected to JobManager at > Actor[akka.tcp://flink@10.64.119.90:33167/user/jobmanager#-2047134036] > 10/18/2016 14:56:11 Job execution switched to status FAILING. > org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable > failure. This suppresses job restarts. Please check the stack trace for the > root cause. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint > jobmanager://savepoints/1. Cannot map old state for task > 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the > program has been changed in a non-compatible way after the savepoint. > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) > ... 10 more > 10/18/2016 14:56:11 Source: Custom Source(1/1) switched to CANCELED > 10/18/2016 14:56:11 Map(1/1) switched to CANCELED > 10/18/2016 14:56:11 Filter(1/1) switched to CANCELED > 10/18/2016 14:56:11 Map(1/1) switched to CANCELED > 10/18/2016 14:56:11 Map(1/1) switched to CANCELED > 10/18/2016 14:56:11 Map(1/1) switched to CANCELED > 10/18/2016 14:56:11 Sink: Unnamed(1/1) switched to CANCELED > 10/18/2016 14:56:11 Job execution switched to status FAILED. > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585) > at com.tfs.rtdp.precompute.Flink.FlinkTest$.main(FlinkTest.scala:141) > at com.tfs.rtdp.precompute.Flink.FlinkTest.main(FlinkTest.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) > at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.execution.SuppressRestartsException: > Unrecoverable failure. This suppresses job restarts. Please check the stack > trace for the root cause. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > ... 2 more > Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint > jobmanager://savepoints/1. Cannot map old state for task > 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the > program has been changed in a non-compatible way after the savepoint. > at > org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) > ... 10 more > > Thanks. > > From: Anirudh Mallem > Date: Tuesday, October 18, 2016 at 2:07 PM > To: "[hidden email]" > Subject: Issue while restarting from SavePoint > > Hi, > I am relatively new to Flink and I was experimenting with the save points > feature. I have an HA cluster running with 1 Master and 4 Workers. The > flink-config.yaml is as follows : > > #============================================================================== > # Common > #============================================================================== > jobmanager.rpc.address: > stable-stream-master01.app.shared.int.sv2.247-inc.net > > # The port where the JobManager's main actor system listens for messages. > jobmanager.rpc.port: 6123 > > # The heap size for the JobManager JVM > jobmanager.heap.mb: 512 > > # The heap size for the TaskManager JVM > taskmanager.heap.mb: 2048 > > # The number of task slots that each TaskManager offers. Each slot runs one > parallel pipeline. > taskmanager.numberOfTaskSlots: 8 > > # Specify whether TaskManager memory should be allocated when starting up > (true) or when > # memory is required in the memory manager (false) > taskmanager.memory.preallocate: false > > # The parallelism used for programs that did not specify and other > parallelism. > parallelism.default: 1 > > env.java.home: /usr/local/java > > restart-strategy: fixed-delay > restart-strategy.fixed-delay.attempts: 2 > restart-strategy.fixed-delay.delay: 10 s > #============================================================================== > # Web Frontend > #============================================================================== > > # The port under which the web-based runtime monitor listens. > # A value of -1 deactivates the web server. > > jobmanager.web.port: 8081 > > # Flag to specify whether job submission is enabled from the web-based > # runtime monitor. Uncomment to disable. > > #jobmanager.web.submit.enable: false > > #============================================================================== > # Streaming state checkpointing > #============================================================================== > > # The backend that will be used to store operator state checkpoints if > # checkpointing is enabled. > # > # Supported backends: jobmanager, filesystem, <class-name-of-factory> > # > state.backend: filesystem > > > # Directory for storing checkpoints in a Flink-supported filesystem > # Note: State backend must be accessible from the JobManager and all > TaskManagers. > # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file > systems, > # (or any local file system under Windows), or "S3://" for S3 file system. > # > state.backend.fs.checkpointdir: file:///home/amallem/ > state.savepoints.dir: file:///home/amallem/save/ > > #============================================================================== > # Master High Availability (required configuration) > #============================================================================== > > # The list of ZooKepper quorum peers that coordinate the high-availability > # setup. This must be a list of the form: > # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181) > # > recovery.mode: zookeeper > # > recovery.zookeeper.quorum: > stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181 > # > # Note: You need to set the state backend to 'filesystem' and the checkpoint > # directory (see above) before configuring the storageDir. > # > recovery.zookeeper.storageDir: file:///home/amallem/recovery > recovery.zookeeper.path.root: /flink > recovery.zookeeper.path.namespace: /cluster_one > > Query : I am able to take a save point and then cancel my current job but > when I try to start the job again using the save point I get the error > stating “ProgramInvocationException: JobManager did not respond within > 60000ms”. I checked Zookeeper and everything seems fine. I also followed the > recommendations in the following post : > http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli > Is there any configuration I am missing to enable restarting of the job. > Any help will be appreciated. Thanks. > > Regards, > Anirudh |
Hi Ufuk,
Thank you for looking into the issue. Please find your answers below : (1) In detached mode the configuration seems to be not picked up correctly. That should be independent of the savepoints. Can you confirm this? —> I tried starting a new job in detached mode and the job started on the cluster. (2) The program was changed in a non-compatible way after the savepoint. Did you change the program and if yes in which way? —> No, I did not make any change to the existing job. I tried restarting the same job. However, I think I have found the problem. I was not mentioning the parallelism specifically when restarting the job from the savepoint. I assumed that this information was also captured in the save point. So the non-detached mode was actually throwing the right error but the detached mode was not picking up the config. I guess the detached mode should also have thrown the same exception right? Thanks a lot for helping. On 10/19/16, 1:19 AM, "Ufuk Celebi" <[hidden email]> wrote: >Hey Anirudh! > >As you say, this looks like two issues: > >(1) In detached mode the configuration seems to be not picked up >correctly. That should be independent of the savepoints. Can you >confirm this? > >(2) The program was changed in a non-compatible way after the >savepoint. Did you change the program and if yes in which way? > >– Ufuk > > >On Wed, Oct 19, 2016 at 12:01 AM, Anirudh Mallem ><[hidden email]> wrote: >> Hi, >> The issue seems to be connected with trying to restart the job in the >> detached mode. The stack trace is as follows: >> >> -bash-3.2$ bin/flink run -d -s jobmanager://savepoints/1 -c >> com.tfs.rtdp.precompute.Flink.FlinkTest >> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar >> file:///home/amallem/capitalone.properties >> Cluster configuration: Standalone cluster with JobManager at >> /10.64.119.90:33167 >> Using address 10.64.119.90:33167 to connect to JobManager. >> JobManager web interface address http://10.64.119.90:8081 >> Starting execution of program >> Submitting Job with JobID: 6c5596772627d3c9366deaa0c47ab0ad. Returning after >> job submission. >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: JobManager did not respond within 60000 milliseconds >> at >> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:432) >> at >> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:93) >> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) >> at >> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75) >> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:323) >> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) >> Caused by: org.apache.flink.runtime.client.JobTimeoutException: JobManager >> did not respond within 60000 milliseconds >> at >> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:227) >> at >> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:429) >> ... 8 more >> Caused by: java.util.concurrent.TimeoutException: Futures timed out after >> [60000 milliseconds] >> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >> at >> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) >> at scala.concurrent.Await$.result(package.scala:107) >> at scala.concurrent.Await.result(package.scala) >> at >> org.apache.flink.runtime.client.JobClient.submitJobDetached(JobClient.java:223) >> ... 9 more >> >> When running it without the detached mode, the job manager is responding but >> the job is failing as it thinks that the structure of the job is modified. >> >> -bash-3.2$ bin/flink run -s jobmanager://savepoints/1 -c >> com.tfs.rtdp.precompute.Flink.FlinkTest >> /tmp/flink-web-upload-2155906f-be54-47f3-b9f7-7f6f0f54f74b/448724f9-f69f-455f-99b9-c57289657e29_uber-flink-test-1.0-SNAPSHOT.jar >> file:///home/amallem/capitalone.properties >> Cluster configuration: Standalone cluster with JobManager at >> /10.64.119.90:33167 >> Using address 10.64.119.90:33167 to connect to JobManager. >> JobManager web interface address http://10.64.119.90:8081 >> Starting execution of program >> Submitting job with JobID: fe7bf69676a5127eecb392e3a0743c6d. Waiting for job >> completion. >> Connected to JobManager at >> Actor[akka.tcp://flink@10.64.119.90:33167/user/jobmanager#-2047134036] >> 10/18/2016 14:56:11 Job execution switched to status FAILING. >> org.apache.flink.runtime.execution.SuppressRestartsException: Unrecoverable >> failure. This suppresses job restarts. Please check the stack trace for the >> root cause. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint >> jobmanager://savepoints/1. Cannot map old state for task >> 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the >> program has been changed in a non-compatible way after the savepoint. >> at >> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) >> ... 10 more >> 10/18/2016 14:56:11 Source: Custom Source(1/1) switched to CANCELED >> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED >> 10/18/2016 14:56:11 Filter(1/1) switched to CANCELED >> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED >> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED >> 10/18/2016 14:56:11 Map(1/1) switched to CANCELED >> 10/18/2016 14:56:11 Sink: Unnamed(1/1) switched to CANCELED >> 10/18/2016 14:56:11 Job execution switched to status FAILED. >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Job execution failed. >> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:405) >> at >> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:95) >> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:378) >> at >> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:68) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:585) >> at com.tfs.rtdp.precompute.Flink.FlinkTest$.main(FlinkTest.scala:141) >> at com.tfs.rtdp.precompute.Flink.FlinkTest.main(FlinkTest.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:497) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:320) >> at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:777) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:253) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1005) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048) >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> execution failed. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:822) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:768) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> Caused by: org.apache.flink.runtime.execution.SuppressRestartsException: >> Unrecoverable failure. This suppresses job restarts. Please check the stack >> trace for the root cause. >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1305) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1291) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >> at >> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> ... 2 more >> Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint >> jobmanager://savepoints/1. Cannot map old state for task >> 4ca3b7c2641c4299a3378fab220c4e5c to the new program. This indicates that the >> program has been changed in a non-compatible way after the savepoint. >> at >> org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:248) >> at >> org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:983) >> at >> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1302) >> ... 10 more >> >> Thanks. >> >> From: Anirudh Mallem >> Date: Tuesday, October 18, 2016 at 2:07 PM >> To: "[hidden email]" >> Subject: Issue while restarting from SavePoint >> >> Hi, >> I am relatively new to Flink and I was experimenting with the save points >> feature. I have an HA cluster running with 1 Master and 4 Workers. The >> flink-config.yaml is as follows : >> >> #============================================================================== >> # Common >> #============================================================================== >> jobmanager.rpc.address: >> stable-stream-master01.app.shared.int.sv2.247-inc.net >> >> # The port where the JobManager's main actor system listens for messages. >> jobmanager.rpc.port: 6123 >> >> # The heap size for the JobManager JVM >> jobmanager.heap.mb: 512 >> >> # The heap size for the TaskManager JVM >> taskmanager.heap.mb: 2048 >> >> # The number of task slots that each TaskManager offers. Each slot runs one >> parallel pipeline. >> taskmanager.numberOfTaskSlots: 8 >> >> # Specify whether TaskManager memory should be allocated when starting up >> (true) or when >> # memory is required in the memory manager (false) >> taskmanager.memory.preallocate: false >> >> # The parallelism used for programs that did not specify and other >> parallelism. >> parallelism.default: 1 >> >> env.java.home: /usr/local/java >> >> restart-strategy: fixed-delay >> restart-strategy.fixed-delay.attempts: 2 >> restart-strategy.fixed-delay.delay: 10 s >> #============================================================================== >> # Web Frontend >> #============================================================================== >> >> # The port under which the web-based runtime monitor listens. >> # A value of -1 deactivates the web server. >> >> jobmanager.web.port: 8081 >> >> # Flag to specify whether job submission is enabled from the web-based >> # runtime monitor. Uncomment to disable. >> >> #jobmanager.web.submit.enable: false >> >> #============================================================================== >> # Streaming state checkpointing >> #============================================================================== >> >> # The backend that will be used to store operator state checkpoints if >> # checkpointing is enabled. >> # >> # Supported backends: jobmanager, filesystem, <class-name-of-factory> >> # >> state.backend: filesystem >> >> >> # Directory for storing checkpoints in a Flink-supported filesystem >> # Note: State backend must be accessible from the JobManager and all >> TaskManagers. >> # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file >> systems, >> # (or any local file system under Windows), or "S3://" for S3 file system. >> # >> state.backend.fs.checkpointdir: file:///home/amallem/ >> state.savepoints.dir: file:///home/amallem/save/ >> >> #============================================================================== >> # Master High Availability (required configuration) >> #============================================================================== >> >> # The list of ZooKepper quorum peers that coordinate the high-availability >> # setup. This must be a list of the form: >> # "host1:clientPort,host2[:clientPort],..." (default clientPort: 2181) >> # >> recovery.mode: zookeeper >> # >> recovery.zookeeper.quorum: >> stable-stream-zookeeper01.app.shared.int.net:2181,stable-stream-zookeeper02.app.shared.int.net:2181 >> # >> # Note: You need to set the state backend to 'filesystem' and the checkpoint >> # directory (see above) before configuring the storageDir. >> # >> recovery.zookeeper.storageDir: file:///home/amallem/recovery >> recovery.zookeeper.path.root: /flink >> recovery.zookeeper.path.namespace: /cluster_one >> >> Query : I am able to take a save point and then cancel my current job but >> when I try to start the job again using the save point I get the error >> stating “ProgramInvocationException: JobManager did not respond within >> 60000ms”. I checked Zookeeper and everything seems fine. I also followed the >> recommendations in the following post : >> http://stackoverflow.com/questions/36625742/cant-deploy-flow-to-ha-cluster-of-apache-flink-using-flink-cli >> Is there any configuration I am missing to enable restarting of the job. >> Any help will be appreciated. Thanks. >> >> Regards, >> Anirudh |
Free forum by Nabble | Edit this page |