Age old stop vs cancel debate

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Age old stop vs cancel debate

Senthil Kumar

We are on flink 1.9.0

 

I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.

My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.

 

When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.

In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.

 

We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.

It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)

 

My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?

 

My understanding was that there was a Stoppable interface (which got removed in 1.9.0)

 

Would appreciate any insights.

 

Cheers

Kumar

Reply | Threaded
Open this post in threaded view
|

Re: Age old stop vs cancel debate

rmetzger0
Hi Kumar,

They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel().
Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job)

Best,
Robert


On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:

We are on flink 1.9.0

 

I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.

My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.

 

When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.

In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.

 

We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.

It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)

 

My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?

 

My understanding was that there was a Stoppable interface (which got removed in 1.9.0)

 

Would appreciate any insights.

 

Cheers

Kumar

Reply | Threaded
Open this post in threaded view
|

Re: Age old stop vs cancel debate

Senthil Kumar

Hi Robert,

 

Would appreciate more insights please.

 

What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException

 

It certainly receives the exception when the flink job is issued a cancel command.

 

In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false)

 

However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable.

 

 

For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal interval which is in hours).

Sleeping for 5 minutes gives us a chance to check the isRunning variable.

 

Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep())

and manually calling Thread.interrupt() from the cancel function.

 

What is your recommendation?

 

Cheers

Kumar

 

 

From: Robert Metzger <[hidden email]>
Date: Friday, May 29, 2020 at 4:38 AM
To: Senthil Kumar <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Age old stop vs cancel debate

 

Hi Kumar,

 

They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel().

Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job)

 

Best,

Robert

 

 

On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:

We are on flink 1.9.0

 

I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.

My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.

 

When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.

In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.

 

We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.

It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)

 

My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?

 

My understanding was that there was a Stoppable interface (which got removed in 1.9.0)

 

Would appreciate any insights.

 

Cheers

Kumar

Reply | Threaded
Open this post in threaded view
|

Re: Age old stop vs cancel debate

rmetzger0
Hi Kumar,

this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior.
If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts.
Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into.

Best,
Robert


On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote:

Hi Robert,

 

Would appreciate more insights please.

 

What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException

 

It certainly receives the exception when the flink job is issued a cancel command.

 

In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false)

 

However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable.

 

 

For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal interval which is in hours).

Sleeping for 5 minutes gives us a chance to check the isRunning variable.

 

Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep())

and manually calling Thread.interrupt() from the cancel function.

 

What is your recommendation?

 

Cheers

Kumar

 

 

From: Robert Metzger <[hidden email]>
Date: Friday, May 29, 2020 at 4:38 AM
To: Senthil Kumar <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Age old stop vs cancel debate

 

Hi Kumar,

 

They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel().

Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job)

 

Best,

Robert

 

 

On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:

We are on flink 1.9.0

 

I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.

My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.

 

When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.

In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.

 

We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.

It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)

 

My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?

 

My understanding was that there was a Stoppable interface (which got removed in 1.9.0)

 

Would appreciate any insights.

 

Cheers

Kumar

Reply | Threaded
Open this post in threaded view
|

Re: Age old stop vs cancel debate

Senthil Kumar

Robert,

 

Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine.

 

Cheers

Kumar

 

From: Robert Metzger <[hidden email]>
Date: Tuesday, June 2, 2020 at 2:40 AM
To: Senthil Kumar <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Age old stop vs cancel debate

 

Hi Kumar,


this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior.

If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts.

Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into.

 

Best,

Robert

 

 

On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote:

Hi Robert,

 

Would appreciate more insights please.

 

What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException

 

It certainly receives the exception when the flink job is issued a cancel command.

 

In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false)

 

However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable.

 

 

For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal interval which is in hours).

Sleeping for 5 minutes gives us a chance to check the isRunning variable.

 

Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep())

and manually calling Thread.interrupt() from the cancel function.

 

What is your recommendation?

 

Cheers

Kumar

 

 

From: Robert Metzger <[hidden email]>
Date: Friday, May 29, 2020 at 4:38 AM
To: Senthil Kumar <[hidden email]>
Cc: "[hidden email]" <[hidden email]>
Subject: Re: Age old stop vs cancel debate

 

Hi Kumar,

 

They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel().

Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job)

 

Best,

Robert

 

 

On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:

We are on flink 1.9.0

 

I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.

My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.

 

When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.

In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.

 

We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.

It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)

 

My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?

 

My understanding was that there was a Stoppable interface (which got removed in 1.9.0)

 

Would appreciate any insights.

 

Cheers

Kumar

Reply | Threaded
Open this post in threaded view
|

Re: Age old stop vs cancel debate

Kostas Kloudas-2
Hi Senthil,

From a quick look at the code, it seems that the cancel() of your
source function should be called, and the thread that it is running on
should be interrupted.

In order to pin down the problem and help us see if this is an actual
bug, could you please:
1) enable debug logging and see if you can spot some lines like this:

"Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth
similar with synchronous savepoint in it

and any other message afterwards with XXXX-ID in it to see if the
savepoint is completed successfully.

2) could you see if this behavior persists in the FLINK-1.10?

Thanks,
Kostas

On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <[hidden email]> wrote:

>
> Robert,
>
>
>
> Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine.
>
>
>
> Cheers
>
> Kumar
>
>
>
> From: Robert Metzger <[hidden email]>
> Date: Tuesday, June 2, 2020 at 2:40 AM
> To: Senthil Kumar <[hidden email]>
> Cc: "[hidden email]" <[hidden email]>
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
> this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior.
>
> If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts.
>
> Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into.
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote:
>
> Hi Robert,
>
>
>
> Would appreciate more insights please.
>
>
>
> What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException
>
>
>
> It certainly receives the exception when the flink job is issued a cancel command.
>
>
>
> In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false)
>
>
>
> However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable.
>
>
>
>
>
> For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal interval which is in hours).
>
> Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>
>
>
> Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep())
>
> and manually calling Thread.interrupt() from the cancel function.
>
>
>
> What is your recommendation?
>
>
>
> Cheers
>
> Kumar
>
>
>
>
>
> From: Robert Metzger <[hidden email]>
> Date: Friday, May 29, 2020 at 4:38 AM
> To: Senthil Kumar <[hidden email]>
> Cc: "[hidden email]" <[hidden email]>
> Subject: Re: Age old stop vs cancel debate
>
>
>
> Hi Kumar,
>
>
>
> They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel().
>
> Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job)
>
>
>
> Best,
>
> Robert
>
>
>
>
>
> On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:
>
> We are on flink 1.9.0
>
>
>
> I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.
>
> My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.
>
>
>
> When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.
>
> In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.
>
>
>
> We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.
>
> It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)
>
>
>
> My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?
>
>
>
> My understanding was that there was a Stoppable interface (which got removed in 1.9.0)
>
>
>
> Would appreciate any insights.
>
>
>
> Cheers
>
> Kumar
Reply | Threaded
Open this post in threaded view
|

Re: Age old stop vs cancel debate

Senthil Kumar
OK, will do and report back.

We are on 1.9.1,

1.10 will take some time __

On 6/9/20, 2:06 AM, "Kostas Kloudas" <[hidden email]> wrote:

    Hi Senthil,
   
    From a quick look at the code, it seems that the cancel() of your
    source function should be called, and the thread that it is running on
    should be interrupted.
   
    In order to pin down the problem and help us see if this is an actual
    bug, could you please:
    1) enable debug logging and see if you can spot some lines like this:
   
    "Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth
    similar with synchronous savepoint in it
   
    and any other message afterwards with XXXX-ID in it to see if the
    savepoint is completed successfully.
   
    2) could you see if this behavior persists in the FLINK-1.10?
   
    Thanks,
    Kostas
   
    On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <[hidden email]> wrote:
    >
    > Robert,
    >
    >
    >
    > Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine.
    >
    >
    >
    > Cheers
    >
    > Kumar
    >
    >
    >
    > From: Robert Metzger <[hidden email]>
    > Date: Tuesday, June 2, 2020 at 2:40 AM
    > To: Senthil Kumar <[hidden email]>
    > Cc: "[hidden email]" <[hidden email]>
    > Subject: Re: Age old stop vs cancel debate
    >
    >
    >
    > Hi Kumar,
    >
    >
    > this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior.
    >
    > If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts.
    >
    > Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into.
    >
    >
    >
    > Best,
    >
    > Robert
    >
    >
    >
    >
    >
    > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote:
    >
    > Hi Robert,
    >
    >
    >
    > Would appreciate more insights please.
    >
    >
    >
    > What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException
    >
    >
    >
    > It certainly receives the exception when the flink job is issued a cancel command.
    >
    >
    >
    > In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false)
    >
    >
    >
    > However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable.
    >
    >
    >
    >
    >
    > For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal interval which is in hours).
    >
    > Sleeping for 5 minutes gives us a chance to check the isRunning variable.
    >
    >
    >
    > Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep())
    >
    > and manually calling Thread.interrupt() from the cancel function.
    >
    >
    >
    > What is your recommendation?
    >
    >
    >
    > Cheers
    >
    > Kumar
    >
    >
    >
    >
    >
    > From: Robert Metzger <[hidden email]>
    > Date: Friday, May 29, 2020 at 4:38 AM
    > To: Senthil Kumar <[hidden email]>
    > Cc: "[hidden email]" <[hidden email]>
    > Subject: Re: Age old stop vs cancel debate
    >
    >
    >
    > Hi Kumar,
    >
    >
    >
    > They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel().
    >
    > Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job)
    >
    >
    >
    > Best,
    >
    > Robert
    >
    >
    >
    >
    >
    > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:
    >
    > We are on flink 1.9.0
    >
    >
    >
    > I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.
    >
    > My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.
    >
    >
    >
    > When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.
    >
    > In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.
    >
    >
    >
    > We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.
    >
    > It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)
    >
    >
    >
    > My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?
    >
    >
    >
    > My understanding was that there was a Stoppable interface (which got removed in 1.9.0)
    >
    >
    >
    > Would appreciate any insights.
    >
    >
    >
    > Cheers
    >
    > Kumar
   

Reply | Threaded
Open this post in threaded view
|

Re: Age old stop vs cancel debate

Kostas Kloudas-2
I understand. Thanks for looking into it Senthil!

Kostas

On Tue, Jun 9, 2020 at 7:32 PM Senthil Kumar <[hidden email]> wrote:

>
> OK, will do and report back.
>
> We are on 1.9.1,
>
> 1.10 will take some time __
>
> On 6/9/20, 2:06 AM, "Kostas Kloudas" <[hidden email]> wrote:
>
>     Hi Senthil,
>
>     From a quick look at the code, it seems that the cancel() of your
>     source function should be called, and the thread that it is running on
>     should be interrupted.
>
>     In order to pin down the problem and help us see if this is an actual
>     bug, could you please:
>     1) enable debug logging and see if you can spot some lines like this:
>
>     "Starting checkpoint (XXXX-ID) SYNC_SAVEPOINT on task XXXXX" or sth
>     similar with synchronous savepoint in it
>
>     and any other message afterwards with XXXX-ID in it to see if the
>     savepoint is completed successfully.
>
>     2) could you see if this behavior persists in the FLINK-1.10?
>
>     Thanks,
>     Kostas
>
>     On Tue, Jun 2, 2020 at 4:20 PM Senthil Kumar <[hidden email]> wrote:
>     >
>     > Robert,
>     >
>     >
>     >
>     > Thank you once again! We are currently doing the “short” Thread.sleep() approach. Seems to be working fine.
>     >
>     >
>     >
>     > Cheers
>     >
>     > Kumar
>     >
>     >
>     >
>     > From: Robert Metzger <[hidden email]>
>     > Date: Tuesday, June 2, 2020 at 2:40 AM
>     > To: Senthil Kumar <[hidden email]>
>     > Cc: "[hidden email]" <[hidden email]>
>     > Subject: Re: Age old stop vs cancel debate
>     >
>     >
>     >
>     > Hi Kumar,
>     >
>     >
>     > this is more a Java question than a Flink question now :) If it is easily possible from your code, then I would regularly check the isRunning flag (by having short Thread.sleeps()) to have a proper cancellation behavior.
>     >
>     > If this makes your code very complicated, then you could work with manually interrupting your worker thread. I would only use this method if you are sure your code (and the libraries you are using) are properly handling interrupts.
>     >
>     > Sorry that I can not give you a more actionable response. It depends a lot on the structure of your code and the libraries you are calling into.
>     >
>     >
>     >
>     > Best,
>     >
>     > Robert
>     >
>     >
>     >
>     >
>     >
>     > On Fri, May 29, 2020 at 10:48 PM Senthil Kumar <[hidden email]> wrote:
>     >
>     > Hi Robert,
>     >
>     >
>     >
>     > Would appreciate more insights please.
>     >
>     >
>     >
>     > What we are noticing: When the flink job is issued a stop command, the Thread.sleep is not receiving the InterruptedException
>     >
>     >
>     >
>     > It certainly receives the exception when the flink job is issued a cancel command.
>     >
>     >
>     >
>     > In both cases (cancel and stop) the cancel() method is getting called (to set the isRunning variable to false)
>     >
>     >
>     >
>     > However, given that the thread does not get interrupted in stop, we are not in a position to check the isRunning variable.
>     >
>     >
>     >
>     >
>     >
>     > For now, we are doing a Thread.sleep  every 5 minutes (instead of the normal interval which is in hours).
>     >
>     > Sleeping for 5 minutes gives us a chance to check the isRunning variable.
>     >
>     >
>     >
>     > Another approach would be to save the currentThread (Thread.currentThread()) before doing a Thread.sleep())
>     >
>     > and manually calling Thread.interrupt() from the cancel function.
>     >
>     >
>     >
>     > What is your recommendation?
>     >
>     >
>     >
>     > Cheers
>     >
>     > Kumar
>     >
>     >
>     >
>     >
>     >
>     > From: Robert Metzger <[hidden email]>
>     > Date: Friday, May 29, 2020 at 4:38 AM
>     > To: Senthil Kumar <[hidden email]>
>     > Cc: "[hidden email]" <[hidden email]>
>     > Subject: Re: Age old stop vs cancel debate
>     >
>     >
>     >
>     > Hi Kumar,
>     >
>     >
>     >
>     > They way you've implemented your custom source sounds like the right way: Having a "running" flag checked by the run() method and changing it in cancel().
>     >
>     > Also, it is good that you are properly handling the interrupt set by Flink (some people ignore InterruptedExceptions, which make it difficult (basically impossible) for Flink to stop the job)
>     >
>     >
>     >
>     > Best,
>     >
>     > Robert
>     >
>     >
>     >
>     >
>     >
>     > On Wed, May 27, 2020 at 7:38 PM Senthil Kumar <[hidden email]> wrote:
>     >
>     > We are on flink 1.9.0
>     >
>     >
>     >
>     > I have a custom SourceFunction, where I rely on isRunning set to false via the cancel() function to exit out of the run loop.
>     >
>     > My run loop essentially gets the data from S3, and then simply sleeps (Thread.sleep) for a specified time interval.
>     >
>     >
>     >
>     > When a job gets cancelled, the SourceFunction.cancel() is called, which sets the isRunning to false.
>     >
>     > In addition, the Thread.sleep gets interrupted, a check Is made on the isRunning variable (set to false now) and the run loop is exited.
>     >
>     >
>     >
>     > We noticed that when we “stop” the flink job, the Thread.sleep does not get interrupted.
>     >
>     > It also appears that SoruceFunction.cancel() is not getting called (which seems like the correct behavior for “stop”)
>     >
>     >
>     >
>     > My question: what’s the “right” way to exit the run() loop when the flink job receives a stop command?
>     >
>     >
>     >
>     > My understanding was that there was a Stoppable interface (which got removed in 1.9.0)
>     >
>     >
>     >
>     > Would appreciate any insights.
>     >
>     >
>     >
>     > Cheers
>     >
>     > Kumar
>
>