Login  Register

How to cancel a Flink DataSource from the driver code?

classic Classic list List threaded Threaded
6 messages Options Options
Embed post
Permalink
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

How to cancel a Flink DataSource from the driver code?

LINZ, Arnaud

Hello,

 

I really looked in the documentation but unfortunately I could not find the answer: how do you cancel your data SourceFunction from your “driver” code (i.e., from a monitoring thread that can initiate a proper shutdown) ? Calling “cancel()” on the object passed to the addSource() has no effect since it does not apply to the marshalled distributed object(s).

 

Best regards,

Arnaud

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: How to cancel a Flink DataSource from the driver code?

Stephan Ewen
Hi Arnaud!

There is a pending issue and pull request that is adding a "cancel()" call to the command line interface.


It would be possible to extend that such that the driver can also cancel the program.

Greetings,
Stephan


On Wed, Jul 1, 2015 at 3:33 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

I really looked in the documentation but unfortunately I could not find the answer: how do you cancel your data SourceFunction from your “driver” code (i.e., from a monitoring thread that can initiate a proper shutdown) ? Calling “cancel()” on the object passed to the addSource() has no effect since it does not apply to the marshalled distributed object(s).

 

Best regards,

Arnaud

 

 




L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

RE: How to cancel a Flink DataSource from the driver code?

LINZ, Arnaud

Hi Stephan,

 

I think that clean shutdown is a major feature to build a complex persistent service that use Flink Streaming for a data-quality critical task, and I’ll mark my code with a // FIXME comment  waiting for this feature to be available !

 

Greetings,

Arnaud

 

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mercredi 1 juillet 2015 15:58
À : [hidden email]
Objet : Re: How to cancel a Flink DataSource from the driver code?

 

Hi Arnaud!

 

There is a pending issue and pull request that is adding a "cancel()" call to the command line interface.

 

 

It would be possible to extend that such that the driver can also cancel the program.

 

Greetings,

Stephan

 

 

On Wed, Jul 1, 2015 at 3:33 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

I really looked in the documentation but unfortunately I could not find the answer: how do you cancel your data SourceFunction from your “driver” code (i.e., from a monitoring thread that can initiate a proper shutdown) ? Calling “cancel()” on the object passed to the addSource() has no effect since it does not apply to the marshalled distributed object(s).

 

Best regards,

Arnaud

 

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: How to cancel a Flink DataSource from the driver code?

rmetzger0
Hi Arnaud,

when using the PersistentKafkaSource, you can always cancel the job in the web interface and start it again. We will continue reading from Kafka where you left off.
You can probably also send the cancel request manually to the web interface, to that URL: http://localhost:8081/jobsInfo?get=cancel&job=68c53a77f11d34695ac1aea4f098af82

But I don't think there is a way to submit a topology in a non-blocking way, so that env.execute() returns immediately with the JobId.


On Thu, Jul 2, 2015 at 9:35 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi Stephan,

 

I think that clean shutdown is a major feature to build a complex persistent service that use Flink Streaming for a data-quality critical task, and I’ll mark my code with a // FIXME comment  waiting for this feature to be available !

 

Greetings,

Arnaud

 

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mercredi 1 juillet 2015 15:58
À : [hidden email]
Objet : Re: How to cancel a Flink DataSource from the driver code?

 

Hi Arnaud!

 

There is a pending issue and pull request that is adding a "cancel()" call to the command line interface.

 

 

It would be possible to extend that such that the driver can also cancel the program.

 

Greetings,

Stephan

 

 

On Wed, Jul 1, 2015 at 3:33 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

I really looked in the documentation but unfortunately I could not find the answer: how do you cancel your data SourceFunction from your “driver” code (i.e., from a monitoring thread that can initiate a proper shutdown) ? Calling “cancel()” on the object passed to the addSource() has no effect since it does not apply to the marshalled distributed object(s).

 

Best regards,

Arnaud

 

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 


Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

RE: How to cancel a Flink DataSource from the driver code?

LINZ, Arnaud

Hi Roger,

 

In fact I am implementing another use case than the one you know about, with more sources than Kafka: we now also use Flink in the BI team (which I belong to).

 

The problem with the web interface is that it is not easily scriptable and to my understanding it does not allow cleaning code to be called upon cancellation. I would have liked to integrate with the standard BI production environment of my company, which requires to be able to call start, status & stop scripts.

 

I think I will implement such a mechanism by periodically testing in my Source for the existence of a specific “heart beet” HDFS file, and quit the run() method if this file no longer exists because it has been deleted by a stop script.

 

Arnaud

 

De : Robert Metzger [mailto:[hidden email]]
Envoyé : jeudi 2 juillet 2015 09:48
À : [hidden email]
Objet : Re: How to cancel a Flink DataSource from the driver code?

 

Hi Arnaud,

 

when using the PersistentKafkaSource, you can always cancel the job in the web interface and start it again. We will continue reading from Kafka where you left off.

You can probably also send the cancel request manually to the web interface, to that URL: http://localhost:8081/jobsInfo?get=cancel&job=68c53a77f11d34695ac1aea4f098af82

 

But I don't think there is a way to submit a topology in a non-blocking way, so that env.execute() returns immediately with the JobId.

 

 

On Thu, Jul 2, 2015 at 9:35 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi Stephan,

 

I think that clean shutdown is a major feature to build a complex persistent service that use Flink Streaming for a data-quality critical task, and I’ll mark my code with a // FIXME comment  waiting for this feature to be available !

 

Greetings,

Arnaud

 

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mercredi 1 juillet 2015 15:58
À : [hidden email]
Objet : Re: How to cancel a Flink DataSource from the driver code?

 

Hi Arnaud!

 

There is a pending issue and pull request that is adding a "cancel()" call to the command line interface.

 

 

It would be possible to extend that such that the driver can also cancel the program.

 

Greetings,

Stephan

 

 

On Wed, Jul 1, 2015 at 3:33 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

I really looked in the documentation but unfortunately I could not find the answer: how do you cancel your data SourceFunction from your “driver” code (i.e., from a monitoring thread that can initiate a proper shutdown) ? Calling “cancel()” on the object passed to the addSource() has no effect since it does not apply to the marshalled distributed object(s).

 

Best regards,

Arnaud

 

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.

 

 

Reply | Threaded
Open this post in threaded view
| More
Print post
Permalink

Re: How to cancel a Flink DataSource from the driver code?

Stephan Ewen
Hi!

You can also cancel jobs via the command line. See here: https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/cli.html

There is also a way to do that programmatically, from Java or Scala.

Greetings,
Stephan


On Wed, Jul 15, 2015 at 4:58 PM, LINZ, Arnaud <[hidden email]> wrote:

Hi Roger,

 

In fact I am implementing another use case than the one you know about, with more sources than Kafka: we now also use Flink in the BI team (which I belong to).

 

The problem with the web interface is that it is not easily scriptable and to my understanding it does not allow cleaning code to be called upon cancellation. I would have liked to integrate with the standard BI production environment of my company, which requires to be able to call start, status & stop scripts.

 

I think I will implement such a mechanism by periodically testing in my Source for the existence of a specific “heart beet” HDFS file, and quit the run() method if this file no longer exists because it has been deleted by a stop script.

 

Arnaud

 

De : Robert Metzger [mailto:[hidden email]]
Envoyé : jeudi 2 juillet 2015 09:48
À : [hidden email]
Objet : Re: How to cancel a Flink DataSource from the driver code?

 

Hi Arnaud,

 

when using the PersistentKafkaSource, you can always cancel the job in the web interface and start it again. We will continue reading from Kafka where you left off.

You can probably also send the cancel request manually to the web interface, to that URL: http://localhost:8081/jobsInfo?get=cancel&job=68c53a77f11d34695ac1aea4f098af82

 

But I don't think there is a way to submit a topology in a non-blocking way, so that env.execute() returns immediately with the JobId.

 

 

On Thu, Jul 2, 2015 at 9:35 AM, LINZ, Arnaud <[hidden email]> wrote:

Hi Stephan,

 

I think that clean shutdown is a major feature to build a complex persistent service that use Flink Streaming for a data-quality critical task, and I’ll mark my code with a // FIXME comment  waiting for this feature to be available !

 

Greetings,

Arnaud

 

 

 

De : [hidden email] [mailto:[hidden email]] De la part de Stephan Ewen
Envoyé : mercredi 1 juillet 2015 15:58
À : [hidden email]
Objet : Re: How to cancel a Flink DataSource from the driver code?

 

Hi Arnaud!

 

There is a pending issue and pull request that is adding a "cancel()" call to the command line interface.

 

 

It would be possible to extend that such that the driver can also cancel the program.

 

Greetings,

Stephan

 

 

On Wed, Jul 1, 2015 at 3:33 PM, LINZ, Arnaud <[hidden email]> wrote:

Hello,

 

I really looked in the documentation but unfortunately I could not find the answer: how do you cancel your data SourceFunction from your “driver” code (i.e., from a monitoring thread that can initiate a proper shutdown) ? Calling “cancel()” on the object passed to the addSource() has no effect since it does not apply to the marshalled distributed object(s).

 

Best regards,

Arnaud

 

 

 



L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.