problem with increase job parallelism

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

problem with increase job parallelism

Lei Chen-2
Hi, 

We're trying to implement some module to help autoscale our pipeline which is built  with Flink on YARN. According to the document, the suggested procedure seems to be:

1. cancel job with savepoint
2. start new job with increased YARN TM number and parallelism. 

However, step 2 always gave error 

Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The procedure worked fine if parallelism was not changed. 

Also want to mention that I didn't manually specify OperatorID in my job. The document does mentioned manually OperatorID assignment is suggested, just curious is that mandatory in my case to fix the problem I'm seeing, given that my program doesn't change at all so the autogenerated operatorID should be unchanged after parallelism increase?

thanks,
Lei
Reply | Threaded
Open this post in threaded view
|

Re: problem with increase job parallelism

Fabian Hueske-2
Hi Lei,

setting explicit operator ID should solve this issue.
As far as I know, the auto-generated operator id also depended on the operator parallelism in previous versions of Flink (not sure until which point).

Which version are you running?

Best, Fabian


2017-10-17 3:15 GMT+02:00 Lei Chen <[hidden email]>:
Hi, 

We're trying to implement some module to help autoscale our pipeline which is built  with Flink on YARN. According to the document, the suggested procedure seems to be:

1. cancel job with savepoint
2. start new job with increased YARN TM number and parallelism. 

However, step 2 always gave error 

Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint hdfs://10.106.238.14:/tmp/savepoint-767421-20907d234655. Cannot map savepoint state for operator 37dfe905df17858e07858039ce3d8ae1 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader.loadAndValidateSavepoint(SavepointLoader.java:130)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1140)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1386)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1372)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The procedure worked fine if parallelism was not changed. 

Also want to mention that I didn't manually specify OperatorID in my job. The document does mentioned manually OperatorID assignment is suggested, just curious is that mandatory in my case to fix the problem I'm seeing, given that my program doesn't change at all so the autogenerated operatorID should be unchanged after parallelism increase?

thanks,
Lei