Thanks for reporting and analyzing this issue Kelly. I think you are indeed running into a Flink bug. I think the problem is the following: With Flink 1.12.0 [1] we introduced a throttling mechanism for discarding checkpoints. The way it is implemented is that once a checkpoint is discarded it can trigger another action. This is triggering another checkpoint in the CheckpointCoordinator. The problem is now that we don't properly handle the case when the CheckpointCoordinator has been stopped in the meantime (e.g. if the job has reached a terminal state). That's why we see this RejectedExecutionException which fails the job. This is definitely a bug and I have created this issue [2] for fixing it. I am also pulling in Roman who worked on this feature.Cheers,TillOn Wed, Jan 13, 2021 at 7:30 PM Kelly Smith <[hidden email]> wrote:Hi folks,
I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM is crashing while cancelling a job. This is causing Kubernetes readiness probes to fail, the JM to be restarted, and then get in a bad state while it tries to recover itself using ZK + a checkpoint which no longer exists.
This is the only information being logged before the process exits:
method: uncaughtException
msg: FATAL: Thread 'cluster-io-thread-4' produced an uncaught exception. Stopping the process...
pod: dev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
stack: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) at org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152) at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
I’m not sure how to debug this further, but it seems like an internal Flink bug?
More info:
- Checkpoints are stored in S3 and I’m using the S3 connector
- Identical code has been running on Flink 1.11.x for months with no issues
Thanks,
Kelly
Free forum by Nabble | Edit this page |