Gracefully Stopping Streaming Job Programmatically in LocalStreamEnvironment

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Gracefully Stopping Streaming Job Programmatically in LocalStreamEnvironment

snntr
Hi everyone,

I have an integration test for which a use a LocalStreamEnvironment.
Currently, the Flink Job is started in a separated thread, which I
interrupt after some time and then do some assertions.

In this situation is there a better way to stop/cancel a running job in
LocalStreamEnvironment programmatically. Side-Info: The job is reading
from a Kafka Cluster, which is programmatically started for each test run.

Cheers,

Konstantin

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Gracefully Stopping Streaming Job Programmatically in LocalStreamEnvironment

Aljoscha Krettek
Hi Konstantin,
I think this is not possible with the current API but I've been thinking about similar stuff this week. Let me quickly outline what I was thinking and then you can tell me whether that would also be helpful for you.

The basic problem is this: I want to be able to write ITCases that test a (theoretically) infinite streaming job but I still have some conditions for termination that I can check in some operator/sink of the program. The problem is now that I have no way of knowing if I reached the final condition and I have no way of canceling the job once that is reached.

The solution I came up with is this:
 - Enhance StreamExecutionEnvironment with a new method executeDetached() (or some such name) that returns a JobSubmissionResult that has methods to query the state of the running job, to cancel the running job and to (and this is important) query the accumulators of the job (live updated).
 - Have accumulators with special names in an operator that are used to signal a "finished" condition, i.e. something like "condition-1-success" or "condition-1-failure".
 - Start the test job in detached mode, periodically check the accumulators, once you have seen all required signals cancel the job and report success, if you see a failure accumulator cancel the job and report test failure.

What do you think?

I'm directly looping in Max and Stephan, Max had something like a detached mode client floating around a while back, I think.

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 16:20 Konstantin Knauf <[hidden email]> wrote:
Hi everyone,

I have an integration test for which a use a LocalStreamEnvironment.
Currently, the Flink Job is started in a separated thread, which I
interrupt after some time and then do some assertions.

In this situation is there a better way to stop/cancel a running job in
LocalStreamEnvironment programmatically. Side-Info: The job is reading
from a Kafka Cluster, which is programmatically started for each test run.

Cheers,

Konstantin

--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Reply | Threaded
Open this post in threaded view
|

Re: Gracefully Stopping Streaming Job Programmatically in LocalStreamEnvironment

snntr
Hi Aljoscha,

thanks for the answer. executeDetached() sounds super helpful for
testing. You could basically return a Future for stopping, cancelling
and so on.

In my current IT I dont have a special sink, I am checking the resulting
files directly, but live access to accumulators sounds very helpful
nonetheless.

Cheers,
Konstantin


On 31.08.2016 11:24, Aljoscha Krettek wrote:

> Hi Konstantin,
> I think this is not possible with the current API but I've been thinking
> about similar stuff this week. Let me quickly outline what I was
> thinking and then you can tell me whether that would also be helpful for
> you.
>
> The basic problem is this: I want to be able to write ITCases that test
> a (theoretically) infinite streaming job but I still have some
> conditions for termination that I can check in some operator/sink of the
> program. The problem is now that I have no way of knowing if I reached
> the final condition and I have no way of canceling the job once that is
> reached.
>
> The solution I came up with is this:
>  - Enhance StreamExecutionEnvironment with a new method
> executeDetached() (or some such name) that returns a JobSubmissionResult
> that has methods to query the state of the running job, to cancel the
> running job and to (and this is important) query the accumulators of the
> job (live updated).
>  - Have accumulators with special names in an operator that are used to
> signal a "finished" condition, i.e. something like "condition-1-success"
> or "condition-1-failure".
>  - Start the test job in detached mode, periodically check the
> accumulators, once you have seen all required signals cancel the job and
> report success, if you see a failure accumulator cancel the job and
> report test failure.
>
> What do you think?
>
> I'm directly looping in Max and Stephan, Max had something like a
> detached mode client floating around a while back, I think.
>
> Cheers,
> Aljoscha
>
> On Mon, 29 Aug 2016 at 16:20 Konstantin Knauf
> <[hidden email] <mailto:[hidden email]>> wrote:
>
>     Hi everyone,
>
>     I have an integration test for which a use a LocalStreamEnvironment.
>     Currently, the Flink Job is started in a separated thread, which I
>     interrupt after some time and then do some assertions.
>
>     In this situation is there a better way to stop/cancel a running job in
>     LocalStreamEnvironment programmatically. Side-Info: The job is reading
>     from a Kafka Cluster, which is programmatically started for each
>     test run.
>
>     Cheers,
>
>     Konstantin
>
>     --
>     Konstantin Knauf * [hidden email]
>     <mailto:[hidden email]> * +49-174-3413182
>     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>     Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
--
Konstantin Knauf * [hidden email] * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Gracefully Stopping Streaming Job Programmatically in LocalStreamEnvironment

zt1983811
The suggestion sounds great, I am wondering when this feature will be implment?