Hi,
I have a flink job that I can trigger a save point for with no problem. However, If I cancel the job then try to run it with the save point, I get the following exception. Any ideas how can I debug or fix it? I am using the exact same jar so I
did not modify the program in any manner. Using Flink version 1.1.4
Caused by: java.lang.IllegalStateException: Failed to rollback to savepoint
<a href="jobmanager://savepoints/1" class="">jobmanager://savepoints/1. Cannot map savepoint state for operator 1692abfa98b4a67c1b7dfc17f79d35d7 to the new program, because the operator is not available in the new program. If you want to allow this, you
can set the --allowNonRestoredState option on the CLI.
at org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinator.restoreSavepoint(SavepointCoordinator.java:257)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.restoreSavepoint(ExecutionGraph.java:1020)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply$mcV$sp(JobManager.scala:1336)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1326)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$org$apache$flink$runtime$jobmanager$JobManager$$submitJob$1.apply(JobManager.scala:1326)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
|
Hi!
Did you change the parallelism in your program, or do the names of some functions change each time you call the program? Can you try what happens when you give explicit IDs to operators via the '.uid(...)' method? Stephan On Tue, Jan 3, 2017 at 11:44 PM, Al-Isawi Rami <[hidden email]> wrote:
|
Hi Stephan,
I have not change the parallelism nor the names or anything in my program. It is the same exact jar file unmodified.
I have tried uid. but I faced this "UnsupportedOperationException: Cannot assign user-specified hash to intermediate node in chain. This will be supported in future versions of Flink. As a work around start new chain at task Map."
Any clues how to carry on? I am just trying to avoid the painful process of dismantling the code and test so I come closer to the cause.
I just think that if I am providing the same exact jar, nothing should break.
Regards,
-Rami
|
Hi Rami, could you maybe provide your code? You could also send it to me directly if you don't want to share with the community. It might be that there is something in the way the pipeline is setup that causes the (generated) operator UIDs to not be deterministic. Best, Aljoscha On Sat, 7 Jan 2017 at 12:36 Rami Al-Isawi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |