We are on flink 1.9.0 I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop. My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval. When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false. In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited. We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted. It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”) My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command? My understanding was that there was a Stoppable interface (which got removed in 1.9.0) Would appreciate any insights. Cheers Kumar |
Hi Kumar, They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel(). Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job) Best, Robert On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:
|
Hi Robert, Would appreciate more insights please. What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException It certainly receives the exception when the flink job is issued a cancel command. In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false) However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable. For now, we are doing a Thread.sleep every 5 minutes (instead of the normal interval which is in hours). Sleeping for 5 minutes gives us a chance to check the isRunning variable. Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep()) and manually calling Thread.interrupt() from the cancel function. What is your recommendation? Cheers Kumar From: Robert Metzger <[hidden email]> Hi Kumar, They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel(). Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job) Best, Robert On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:
|
Hi Kumar, this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior. If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts. Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into. Best, Robert On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote:
|
Robert, Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine. Cheers Kumar From: Robert Metzger <[hidden email]> Hi Kumar,
If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts. Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into. Best, Robert On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote:
|
Hi Senthil,
From a quick look at the code, it seems that the cancel() of your source function should be called, and the thread that it is running on should be interrupted. In order to pin down the problem and help us see if this is an actual bug, could you please: 1) enable debug logging and see if you can spot some lines like this: "Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth similar with synchronous savepoint in it and any other message afterwards with XXXX-ID in it to see if the savepoint is completed successfully. 2) could you see if this behavior persists in the FLINK-1.10? Thanks, Kostas On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <[hidden email]> wrote: > > Robert, > > > > Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine. > > > > Cheers > > Kumar > > > > From: Robert Metzger <[hidden email]> > Date: Tuesday, June 2, 2020 at 2:40 AM > To: Senthil Kumar <[hidden email]> > Cc: "[hidden email]" <[hidden email]> > Subject: Re: Age old stop vs cancel debate > > > > Hi Kumar, > > > this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior. > > If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts. > > Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into. > > > > Best, > > Robert > > > > > > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote: > > Hi Robert, > > > > Would appreciate more insights please. > > > > What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException > > > > It certainly receives the exception when the flink job is issued a cancel command. > > > > In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false) > > > > However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable. > > > > > > For now, we are doing a Thread.sleep every 5 minutes (instead of the normal interval which is in hours). > > Sleeping for 5 minutes gives us a chance to check the isRunning variable. > > > > Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep()) > > and manually calling Thread.interrupt() from the cancel function. > > > > What is your recommendation? > > > > Cheers > > Kumar > > > > > > From: Robert Metzger <[hidden email]> > Date: Friday, May 29, 2020 at 4:38 AM > To: Senthil Kumar <[hidden email]> > Cc: "[hidden email]" <[hidden email]> > Subject: Re: Age old stop vs cancel debate > > > > Hi Kumar, > > > > They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel(). > > Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job) > > > > Best, > > Robert > > > > > > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote: > > We are on flink 1.9.0 > > > > I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop. > > My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval. > > > > When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false. > > In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited. > > > > We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted. > > It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”) > > > > My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command? > > > > My understanding was that there was a Stoppable interface (which got removed in 1.9.0) > > > > Would appreciate any insights. > > > > Cheers > > Kumar |
OK, will do and report back.
We are on 1.9.1, 1.10 will take some time __ On 6/9/20, 2:06 AM, "Kostas Kloudas" <[hidden email]> wrote: Hi Senthil, From a quick look at the code, it seems that the cancel() of your source function should be called, and the thread that it is running on should be interrupted. In order to pin down the problem and help us see if this is an actual bug, could you please: 1) enable debug logging and see if you can spot some lines like this: "Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth similar with synchronous savepoint in it and any other message afterwards with XXXX-ID in it to see if the savepoint is completed successfully. 2) could you see if this behavior persists in the FLINK-1.10? Thanks, Kostas On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <[hidden email]> wrote: > > Robert, > > > > Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine. > > > > Cheers > > Kumar > > > > From: Robert Metzger <[hidden email]> > Date: Tuesday, June 2, 2020 at 2:40 AM > To: Senthil Kumar <[hidden email]> > Cc: "[hidden email]" <[hidden email]> > Subject: Re: Age old stop vs cancel debate > > > > Hi Kumar, > > > this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior. > > If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts. > > Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into. > > > > Best, > > Robert > > > > > > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote: > > Hi Robert, > > > > Would appreciate more insights please. > > > > What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException > > > > It certainly receives the exception when the flink job is issued a cancel command. > > > > In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false) > > > > However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable. > > > > > > For now, we are doing a Thread.sleep every 5 minutes (instead of the normal interval which is in hours). > > Sleeping for 5 minutes gives us a chance to check the isRunning variable. > > > > Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep()) > > and manually calling Thread.interrupt() from the cancel function. > > > > What is your recommendation? > > > > Cheers > > Kumar > > > > > > From: Robert Metzger <[hidden email]> > Date: Friday, May 29, 2020 at 4:38 AM > To: Senthil Kumar <[hidden email]> > Cc: "[hidden email]" <[hidden email]> > Subject: Re: Age old stop vs cancel debate > > > > Hi Kumar, > > > > They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel(). > > Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job) > > > > Best, > > Robert > > > > > > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote: > > We are on flink 1.9.0 > > > > I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop. > > My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval. > > > > When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false. > > In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited. > > > > We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted. > > It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”) > > > > My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command? > > > > My understanding was that there was a Stoppable interface (which got removed in 1.9.0) > > > > Would appreciate any insights. > > > > Cheers > > Kumar |
I understand. Thanks for looking into it Senthil!
Kostas On Tue, Jun 9, 2020 at 7:32 PM Senthil Kumar <[hidden email]> wrote: > > OK, will do and report back. > > We are on 1.9.1, > > 1.10 will take some time __ > > On 6/9/20, 2:06 AM, "Kostas Kloudas" <[hidden email]> wrote: > > Hi Senthil, > > From a quick look at the code, it seems that the cancel() of your > source function should be called, and the thread that it is running on > should be interrupted. > > In order to pin down the problem and help us see if this is an actual > bug, could you please: > 1) enable debug logging and see if you can spot some lines like this: > > "Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth > similar with synchronous savepoint in it > > and any other message afterwards with XXXX-ID in it to see if the > savepoint is completed successfully. > > 2) could you see if this behavior persists in the FLINK-1.10? > > Thanks, > Kostas > > On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <[hidden email]> wrote: > > > > Robert, > > > > > > > > Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine. > > > > > > > > Cheers > > > > Kumar > > > > > > > > From: Robert Metzger <[hidden email]> > > Date: Tuesday, June 2, 2020 at 2:40 AM > > To: Senthil Kumar <[hidden email]> > > Cc: "[hidden email]" <[hidden email]> > > Subject: Re: Age old stop vs cancel debate > > > > > > > > Hi Kumar, > > > > > > this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior. > > > > If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts. > > > > Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into. > > > > > > > > Best, > > > > Robert > > > > > > > > > > > > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote: > > > > Hi Robert, > > > > > > > > Would appreciate more insights please. > > > > > > > > What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException > > > > > > > > It certainly receives the exception when the flink job is issued a cancel command. > > > > > > > > In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false) > > > > > > > > However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable. > > > > > > > > > > > > For now, we are doing a Thread.sleep every 5 minutes (instead of the normal interval which is in hours). > > > > Sleeping for 5 minutes gives us a chance to check the isRunning variable. > > > > > > > > Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep()) > > > > and manually calling Thread.interrupt() from the cancel function. > > > > > > > > What is your recommendation? > > > > > > > > Cheers > > > > Kumar > > > > > > > > > > > > From: Robert Metzger <[hidden email]> > > Date: Friday, May 29, 2020 at 4:38 AM > > To: Senthil Kumar <[hidden email]> > > Cc: "[hidden email]" <[hidden email]> > > Subject: Re: Age old stop vs cancel debate > > > > > > > > Hi Kumar, > > > > > > > > They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel(). > > > > Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job) > > > > > > > > Best, > > > > Robert > > > > > > > > > > > > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote: > > > > We are on flink 1.9.0 > > > > > > > > I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop. > > > > My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval. > > > > > > > > When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false. > > > > In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited. > > > > > > > > We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted. > > > > It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”) > > > > > > > > My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command? > > > > > > > > My understanding was that there was a Stoppable interface (which got removed in 1.9.0) > > > > > > > > Would appreciate any insights. > > > > > > > > Cheers > > > > Kumar > > |
Free forum by Nabble | Edit this page |