We had an unusual situation last night. One of our Flink clusters experienced some connectivity issues, with lead to the the single job running on the cluster failing and then being restored.
And then something odd happened. The cluster decided to also restore an old version of the job. One we were running a month ago. That job was canceled on June 5 with a savepoint: June 5th 2018, 15:00:43.865 Trying to cancel job c59dd3133b1182ce2c05a5e2603a0646 with savepoint to s3://bucket/flink/foo/savepoints June 5th 2018, 15:00:44.438 Savepoint stored in s3://bucket/flink/foo/savepoints/savepoint-c59dd3-f748765c67df. Now cancelling c59dd3133b1182ce2c05a5e2603a0646. June 5th 2018, 15:00:44.438 Job IOC Engine (c59dd3133b1182ce2c05a5e2603a0646) switched from state RUNNING to CANCELLING. June 5th 2018, 15:00:44.495 Job IOC Engine (c59dd3133b1182ce2c05a5e2603a0646) switched from state CANCELLING to CANCELED. June 5th 2018, 15:00:44.507 Removed job graph c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper. June 5th 2018, 15:00:44.508 Removing /flink/foo/checkpoints/c59dd3133b1182ce2c05a5e2603a0646 from ZooKeeper June 5th 2018, 15:00:44.732 Job c59dd3133b1182ce2c05a5e2603a0646 has been archived at s3://bucket/flink/foo/archive/c59dd3133b1182ce2c05a5e2603a0646. But then yesterday: June 19th 2018, 17:55:31.917 Attempting to recover job c59dd3133b1182ce2c05a5e2603a0646. June 19th 2018, 17:55:32.155 Recovered SubmittedJobGraph(c59dd3133b1182ce2c05a5e2603a0646, JobInfo(clients: Set((Actor[akka.tcp://[hidden email]:42823/temp/$c],DETACHED)), start: 1524514537697)). June 19th 2018, 17:55:32.157 Submitting job c59dd3133b1182ce2c05a5e2603a0646 (Some Job) (Recovery). June 19th 2018, 17:55:32.157 Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=30000) for c59dd3133b1182ce2c05a5e2603a0646. June 19th 2018, 17:55:32.157 Submitting recovered job c59dd3133b1182ce2c05a5e2603a0646. June 19th 2018, 17:55:32.158 Running initialization on master for job Some Job (c59dd3133b1182ce2c05a5e2603a0646). June 19th 2018, 17:55:32.165 Initialized in '/checkpoints/c59dd3133b1182ce2c05a5e2603a0646'. June 19th 2018, 17:55:32.170 Job Some Job (c59dd3133b1182ce2c05a5e2603a0646) switched from state CREATED to RUNNING. June 19th 2018, 17:55:32.170 Scheduling job c59dd3133b1182ce2c05a5e2603a0646 (Some Job). Anyone seen anything like this? Any ideas what the cause may have been? I am guessing that the state in ZK or S3 may have been somewhat corrupted when the job was previously shutdown, and that when the cluster encountered networking problems yesterday that lead to the cancel and restore of the currently running job, the restore logic scanned ZK or S3 looking for jobs to restore, came across the old job with bad state and decided to bring it back to life. Any way to scan ZooKeeper or S3 for such jobs? |
The source of the issue may be this error that occurred when the job was being canceled on June 5: June 5th 2018, 14:59:59.430 Failure during cancellation of job c59dd3133b1182ce2c05a5e2603a0646 with savepoint. java.io.IOException: Failed to create savepoint directory at --checkpoint-dir at org.apache.flink.runtime.checkpoint.savepoint.SavepointStore.createSavepointDirectory(SavepointStore.java:106) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.triggerSavepoint(CheckpointCoordinator.java:376) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:577) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) On Wed, Jun 20, 2018 at 9:31 AM Elias Levy <[hidden email]> wrote:
|
Alas, that error appears to be a red herring. Admin mistyped the cancel command leading to the error. But immediately corrected it, resulting in the job being canceled next. So seems unrelated to the job coming back to life later on. On Wed, Jun 20, 2018 at 10:04 AM Elias Levy <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |