Hi everyone,
I have an integration test for which a use a LocalStreamEnvironment. Currently, the Flink Job is started in a separated thread, which I interrupt after some time and then do some assertions. In this situation is there a better way to stop/cancel a running job in LocalStreamEnvironment programmatically. Side-Info: The job is reading from a Kafka Cluster, which is programmatically started for each test run. Cheers, Konstantin -- Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc (836 bytes) Download Attachment |
Hi Konstantin,
I think this is not possible with the current API but I've been thinking about similar stuff this week. Let me quickly outline what I was thinking and then you can tell me whether that would also be helpful for you. The basic problem is this: I want to be able to write ITCases that test a (theoretically) infinite streaming job but I still have some conditions for termination that I can check in some operator/sink of the program. The problem is now that I have no way of knowing if I reached the final condition and I have no way of canceling the job once that is reached. The solution I came up with is this: - Enhance StreamExecutionEnvironment with a new method executeDetached() (or some such name) that returns a JobSubmissionResult that has methods to query the state of the running job, to cancel the running job and to (and this is important) query the accumulators of the job (live updated). - Have accumulators with special names in an operator that are used to signal a "finished" condition, i.e. something like "condition-1-success" or "condition-1-failure". - Start the test job in detached mode, periodically check the accumulators, once you have seen all required signals cancel the job and report success, if you see a failure accumulator cancel the job and report test failure. What do you think? I'm directly looping in Max and Stephan, Max had something like a detached mode client floating around a while back, I think. Cheers, Aljoscha On Mon, 29 Aug 2016 at 16:20 Konstantin Knauf <[hidden email]> wrote: Hi everyone, |
Hi Aljoscha,
thanks for the answer. executeDetached() sounds super helpful for testing. You could basically return a Future for stopping, cancelling and so on. In my current IT I dont have a special sink, I am checking the resulting files directly, but live access to accumulators sounds very helpful nonetheless. Cheers, Konstantin On 31.08.2016 11:24, Aljoscha Krettek wrote: > Hi Konstantin, > I think this is not possible with the current API but I've been thinking > about similar stuff this week. Let me quickly outline what I was > thinking and then you can tell me whether that would also be helpful for > you. > > The basic problem is this: I want to be able to write ITCases that test > a (theoretically) infinite streaming job but I still have some > conditions for termination that I can check in some operator/sink of the > program. The problem is now that I have no way of knowing if I reached > the final condition and I have no way of canceling the job once that is > reached. > > The solution I came up with is this: > - Enhance StreamExecutionEnvironment with a new method > executeDetached() (or some such name) that returns a JobSubmissionResult > that has methods to query the state of the running job, to cancel the > running job and to (and this is important) query the accumulators of the > job (live updated). > - Have accumulators with special names in an operator that are used to > signal a "finished" condition, i.e. something like "condition-1-success" > or "condition-1-failure". > - Start the test job in detached mode, periodically check the > accumulators, once you have seen all required signals cancel the job and > report success, if you see a failure accumulator cancel the job and > report test failure. > > What do you think? > > I'm directly looping in Max and Stephan, Max had something like a > detached mode client floating around a while back, I think. > > Cheers, > Aljoscha > > On Mon, 29 Aug 2016 at 16:20 Konstantin Knauf > <[hidden email] <mailto:[hidden email]>> wrote: > > Hi everyone, > > I have an integration test for which a use a LocalStreamEnvironment. > Currently, the Flink Job is started in a separated thread, which I > interrupt after some time and then do some assertions. > > In this situation is there a better way to stop/cancel a running job in > LocalStreamEnvironment programmatically. Side-Info: The job is reading > from a Kafka Cluster, which is programmatically started for each > test run. > > Cheers, > > Konstantin > > -- > Konstantin Knauf * [hidden email] > <mailto:[hidden email]> * +49-174-3413182 > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 > Konstantin Knauf * [hidden email] * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc (836 bytes) Download Attachment |
Free forum by Nabble | Edit this page |