Uncaught exception in FatalExitExceptionHandler causing JM crash while canceling job

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Uncaught exception in FatalExitExceptionHandler causing JM crash while canceling job

Kelly Smith

Hi folks,

 

I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM is crashing while cancelling a job. This is causing Kubernetes readiness probes to fail, the JM to be restarted, and then get in a bad state while it tries to recover itself using ZK + a checkpoint which no longer exists.

 

This is the only information being logged before the process exits:

 

 

 methoduncaughtException
   
msgFATAL: Thread 'cluster-io-thread-4' produced an uncaught exception. Stopping the process...
   
poddev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
   
stackjava.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) at org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152) at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

 

 

https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58

 

 

I’m not sure how to debug this further, but it seems like an internal Flink bug?

 

More info:

  • Checkpoints are stored in S3 and I’m using the S3 connector
  • Identical code has been running on Flink 1.11.x for months with no issues

 

 

Thanks,

Kelly

Reply | Threaded
Open this post in threaded view
|

Re: Uncaught exception in FatalExitExceptionHandler causing JM crash while canceling job

Till Rohrmann
Thanks for reporting and analyzing this issue Kelly. I think you are indeed running into a Flink bug. I think the problem is the following: With Flink 1.12.0 [1] we introduced a throttling mechanism for discarding checkpoints. The way it is implemented is that once a checkpoint is discarded it can trigger another action. This is triggering another checkpoint in the CheckpointCoordinator. The problem is now that we don't properly handle the case when the CheckpointCoordinator has been stopped in the meantime (e.g. if the job has reached a terminal state). That's why we see this RejectedExecutionException which fails the job. This is definitely a bug and I have created this issue [2] for fixing it. I am also pulling in Roman who worked on this feature.


Cheers,
Till

On Wed, Jan 13, 2021 at 7:30 PM Kelly Smith <[hidden email]> wrote:

Hi folks,

 

I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM is crashing while cancelling a job. This is causing Kubernetes readiness probes to fail, the JM to be restarted, and then get in a bad state while it tries to recover itself using ZK + a checkpoint which no longer exists.

 

This is the only information being logged before the process exits:

 

 

 methoduncaughtException
   
msgFATAL: Thread 'cluster-io-thread-4' produced an uncaught exception. Stopping the process...
   
poddev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
   
stackjava.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) at org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152) at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

 

 

https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58

 

 

I’m not sure how to debug this further, but it seems like an internal Flink bug?

 

More info:

  • Checkpoints are stored in S3 and I’m using the S3 connector
  • Identical code has been running on Flink 1.11.x for months with no issues

 

 

Thanks,

Kelly

Reply | Threaded
Open this post in threaded view
|

Re: Uncaught exception in FatalExitExceptionHandler causing JM crash while canceling job

r_khachatryan
I think you're right Till, this is the problem. 
In fact, I opened a duplicating jira ticket in parallel :)
I hope we can fix it in the next version of 1.12.

Regards,
Roman


On Fri, Jan 15, 2021 at 2:09 PM Till Rohrmann <[hidden email]> wrote:
Thanks for reporting and analyzing this issue Kelly. I think you are indeed running into a Flink bug. I think the problem is the following: With Flink 1.12.0 [1] we introduced a throttling mechanism for discarding checkpoints. The way it is implemented is that once a checkpoint is discarded it can trigger another action. This is triggering another checkpoint in the CheckpointCoordinator. The problem is now that we don't properly handle the case when the CheckpointCoordinator has been stopped in the meantime (e.g. if the job has reached a terminal state). That's why we see this RejectedExecutionException which fails the job. This is definitely a bug and I have created this issue [2] for fixing it. I am also pulling in Roman who worked on this feature.


Cheers,
Till

On Wed, Jan 13, 2021 at 7:30 PM Kelly Smith <[hidden email]> wrote:

Hi folks,

 

I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM is crashing while cancelling a job. This is causing Kubernetes readiness probes to fail, the JM to be restarted, and then get in a bad state while it tries to recover itself using ZK + a checkpoint which no longer exists.

 

This is the only information being logged before the process exits:

 

 

 methoduncaughtException
   
msgFATAL: Thread 'cluster-io-thread-4' produced an uncaught exception. Stopping the process...
   
poddev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
   
stackjava.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407 rejected from java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 25977] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622) at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668) at org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152) at org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

 

 

https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58

 

 

I’m not sure how to debug this further, but it seems like an internal Flink bug?

 

More info:

  • Checkpoints are stored in S3 and I’m using the S3 connector
  • Identical code has been running on Flink 1.11.x for months with no issues

 

 

Thanks,

Kelly