Unit / Integration Test Timer

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Unit / Integration Test Timer

Ashish Pokharel
All,

Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        CollectSink.values.clear();
        // create a stream of custom elements and apply transformations
        env.fromCollection(t.getTestInputs())
.process(new TupleProcessFn())
.keyBy(FactTuple::getKey)
.process(new NormalizeDataProcessFn(2))
.addSink(getSink())

        env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail.

Thanks,

Ashish 
Reply | Threaded
Open this post in threaded view
|

Re: Unit / Integration Test Timer

Till Rohrmann
Hi Ashish,

how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.

Cheers,
Till

On Fri, Sep 14, 2018 at 3:36 PM ashish pok <[hidden email]> wrote:
All,

Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        CollectSink.values.clear();
        // create a stream of custom elements and apply transformations
        env.fromCollection(t.getTestInputs())
.process(new TupleProcessFn())
.keyBy(FactTuple::getKey)
.process(new NormalizeDataProcessFn(2))
.addSink(getSink())

        env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail.

Thanks,

Ashish 
Reply | Threaded
Open this post in threaded view
|

Re: Unit / Integration Test Timer

Ashish Pokharel
Hi Till,

To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear down once it consumes last data point (ie. end of collection I am passing) even though there could be active Timers Registered? 

Further, most of our pipelines are using low-level process functions - we toyed around with other windowing and session functions but process functions gave the most amount of flexibility (at least at this point until we can re-visit) and we generate keys for aggregation/windowing somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines are event / processing time agnostic in a sense. Although technically within the process functions we will have timers registered etc. This helped us with unbounded keys, sensor data that could potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit there are probably better solutions :)

With that background, I am sort of not following your second note about event time and how we can leverage that for testing. Our intent is to create sampled input from results and compare output from tests to results (ie. end to end integration tests) as part of our CICD. Normal flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery right now :) So Single Operator harnesses doesn't sound like the right approach. let me know otherwise.

Thanks,


On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann <[hidden email]> wrote:


Hi Ashish,

how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.

Cheers,
Till

On Fri, Sep 14, 2018 at 3:36 PM ashish pok <[hidden email]> wrote:
All,

Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        CollectSink.values.clear();
        // create a stream of custom elements and apply transformations
        env.fromCollection(t.getTestInputs())
.process(new TupleProcessFn())
.keyBy(FactTuple::getKey)
.process(new NormalizeDataProcessFn(2))
.addSink(getSink())

        env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail.

Thanks,

Ashish 
Reply | Threaded
Open this post in threaded view
|

Re: Unit / Integration Test Timer

Till Rohrmann
Hi Ashish,

I think you are right. In the current master, the system should wait until all timers have completed before terminating. Could you check whether this is the case? If not, then this might indicate a problem. Which version of Flink are you using? I guess it would also be helpful to have access to a working example where the problem is visible.

Cheers,
Till

On Fri, Sep 14, 2018 at 7:57 PM ashish pok <[hidden email]> wrote:
Hi Till,

To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear down once it consumes last data point (ie. end of collection I am passing) even though there could be active Timers Registered? 

Further, most of our pipelines are using low-level process functions - we toyed around with other windowing and session functions but process functions gave the most amount of flexibility (at least at this point until we can re-visit) and we generate keys for aggregation/windowing somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines are event / processing time agnostic in a sense. Although technically within the process functions we will have timers registered etc. This helped us with unbounded keys, sensor data that could potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit there are probably better solutions :)

With that background, I am sort of not following your second note about event time and how we can leverage that for testing. Our intent is to create sampled input from results and compare output from tests to results (ie. end to end integration tests) as part of our CICD. Normal flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery right now :) So Single Operator harnesses doesn't sound like the right approach. let me know otherwise.

Thanks,


On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann <[hidden email]> wrote:


Hi Ashish,

how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.

Cheers,
Till

On Fri, Sep 14, 2018 at 3:36 PM ashish pok <[hidden email]> wrote:
All,

Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        CollectSink.values.clear();
        // create a stream of custom elements and apply transformations
        env.fromCollection(t.getTestInputs())
.process(new TupleProcessFn())
.keyBy(FactTuple::getKey)
.process(new NormalizeDataProcessFn(2))
.addSink(getSink())

        env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail.

Thanks,

Ashish 
Reply | Threaded
Open this post in threaded view
|

Re: Unit / Integration Test Timer

Ashish Pokharel
Hi Till,

I am still in 1.4.2 version and will need some time before we can get later version certified in our Prod env. Timers are definitely not completing in my tests with 1.4.2 utils, I can see them being registered in debugger though. Having said that, should I pull latest test utils only and try it out?  

Creating a simple example shouldn't take that long, I can create one sometime this week.

Thanks, Ashish

On Monday, September 17, 2018, 3:53:59 AM EDT, Till Rohrmann <[hidden email]> wrote:


Hi Ashish,

I think you are right. In the current master, the system should wait until all timers have completed before terminating. Could you check whether this is the case? If not, then this might indicate a problem. Which version of Flink are you using? I guess it would also be helpful to have access to a working example where the problem is visible.

Cheers,
Till

On Fri, Sep 14, 2018 at 7:57 PM ashish pok <[hidden email]> wrote:
Hi Till,

To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear down once it consumes last data point (ie. end of collection I am passing) even though there could be active Timers Registered? 

Further, most of our pipelines are using low-level process functions - we toyed around with other windowing and session functions but process functions gave the most amount of flexibility (at least at this point until we can re-visit) and we generate keys for aggregation/windowing somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines are event / processing time agnostic in a sense. Although technically within the process functions we will have timers registered etc. This helped us with unbounded keys, sensor data that could potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit there are probably better solutions :)

With that background, I am sort of not following your second note about event time and how we can leverage that for testing. Our intent is to create sampled input from results and compare output from tests to results (ie. end to end integration tests) as part of our CICD. Normal flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery right now :) So Single Operator harnesses doesn't sound like the right approach. let me know otherwise.

Thanks,


On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann <[hidden email]> wrote:


Hi Ashish,

how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.

Cheers,
Till

On Fri, Sep 14, 2018 at 3:36 PM ashish pok <[hidden email]> wrote:
All,

Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        CollectSink.values.clear();
        // create a stream of custom elements and apply transformations
        env.fromCollection(t.getTestInputs())
.process(new TupleProcessFn())
.keyBy(FactTuple::getKey)
.process(new NormalizeDataProcessFn(2))
.addSink(getSink())

        env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail.

Thanks,

Ashish 
Reply | Threaded
Open this post in threaded view
|

Re: Unit / Integration Test Timer

Ashish Pokharel
Hi Till,

A quick update. I added a MockFlatMap function with the following logic:

        public MockStreamPause(int pauseSeconds) {
this.pauseSeconds = pauseSeconds;
}

        @Override
public void flatMap(PlatformEvent event, Collector<PlatformEvent> out) throws Exception {
if(event.getSrc().startsWith(EventTupleGeneratorUtil.PAUSE_EVENT_SRC_PREFIX)) {
if (pauseSeconds>0) {
try {
Thread.sleep(pauseSeconds*1000);
} catch (InterruptedException intEx) {
logger.info("Mock pause interrupted", intEx);
}
}
} else {
out.collect(event);
}
}

This seems to let me move forward with testing. Let me know if you recommend using latest test utils with 1.4.2 core as a test. 

Thanks, Ashish

On Monday, September 17, 2018, 9:33:56 AM EDT, ashish pok <[hidden email]> wrote:


Hi Till,

I am still in 1.4.2 version and will need some time before we can get later version certified in our Prod env. Timers are definitely not completing in my tests with 1.4.2 utils, I can see them being registered in debugger though. Having said that, should I pull latest test utils only and try it out?  

Creating a simple example shouldn't take that long, I can create one sometime this week.

Thanks, Ashish

On Monday, September 17, 2018, 3:53:59 AM EDT, Till Rohrmann <[hidden email]> wrote:


Hi Ashish,

I think you are right. In the current master, the system should wait until all timers have completed before terminating. Could you check whether this is the case? If not, then this might indicate a problem. Which version of Flink are you using? I guess it would also be helpful to have access to a working example where the problem is visible.

Cheers,
Till

On Fri, Sep 14, 2018 at 7:57 PM ashish pok <[hidden email]> wrote:
Hi Till,

To answer your first question, I currently don't (and honestly now sure how other than of course in IDE I can use breakpoint, or if something like MockIto can do it). So did I interpret it correctly that it sounds like execution env started using flink-test-utils will essentially tear down once it consumes last data point (ie. end of collection I am passing) even though there could be active Timers Registered? 

Further, most of our pipelines are using low-level process functions - we toyed around with other windowing and session functions but process functions gave the most amount of flexibility (at least at this point until we can re-visit) and we generate keys for aggregation/windowing somewhere upstream (say map, flatMap or another process functions). Meaning some pipelines are event / processing time agnostic in a sense. Although technically within the process functions we will have timers registered etc. This helped us with unbounded keys, sensor data that could potentially be backfilled (ie. watermarks have passed way back etc). I wouldn't doubt a bit there are probably better solutions :)

With that background, I am sort of not following your second note about event time and how we can leverage that for testing. Our intent is to create sampled input from results and compare output from tests to results (ie. end to end integration tests) as part of our CICD. Normal flow seems to work well, just getting "negative" test cases of timeouts seems to be mystery right now :) So Single Operator harnesses doesn't sound like the right approach. let me know otherwise.

Thanks,


On Friday, September 14, 2018, 11:42:17 AM EDT, Till Rohrmann <[hidden email]> wrote:


Hi Ashish,

how do you make sure that all of your data is not consumed within a fraction of the 2 seconds? For this it would be better to use event time which allows you to control how time passes. If you want to test a specific operator you could try out the One/TwoInputStreamOperatorTestHarness.

Cheers,
Till

On Fri, Sep 14, 2018 at 3:36 PM ashish pok <[hidden email]> wrote:
All,

Hopefully a quick one. I feel like I have seen this answered before a few times before but can't find an appropriate example. I am trying to run few tests where registered timeouts are invoked (snippet below). Simple example as show in documentation for integration test (using flink-test-utils) seems to complete even though Timers are registered and have not been invoked. 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        CollectSink.values.clear();
        // create a stream of custom elements and apply transformations
        env.fromCollection(t.getTestInputs())
.process(new TupleProcessFn())
.keyBy(FactTuple::getKey)
.process(new NormalizeDataProcessFn(2))
.addSink(getSink())

        env.execute();

I have a 2 second processing timer registered. If I put a breakpoint in first TupleProcessFn() after a few Tuples are collected I can see onTimer being invoked. So what is the trick here? I went as far as putting in a MapFunction after second process function that has a sleep to no avail.

Thanks,

Ashish