Published test artifacts for flink streaming

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Published test artifacts for flink streaming

Nick Dimiduk
Hello,

I'm attempting integration tests for my streaming flows. I'd like to
produce an input stream of java objects and sink the results into a
collection for verification via JUnit asserts.
StreamExecutionEnvironment provides methods for the former, however,
how to achieve the latter is not evident based on my internet
searching. I think I've found a solution in the TestStreamEnvironment
class, ie, as used by WindowingIntegrationTest. However, this class
appears to be packaged in the flink-streaming-core test artifact,
which is not published to maven.

For reference, this is the maven dependency stanza I'm using. Please
let me know if I've got it wrong.

Thanks,
Nick

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <classifier>test</classifier>
      <scope>test</scope>
    </dependency>
Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

rmetzger0
Hi Nick,

we are usually publishing the test  artifacts. Can you try and replace the <classifier> tag by <type>test-jar<type>:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
Hello,

I'm attempting integration tests for my streaming flows. I'd like to
produce an input stream of java objects and sink the results into a
collection for verification via JUnit asserts.
StreamExecutionEnvironment provides methods for the former, however,
how to achieve the latter is not evident based on my internet
searching. I think I've found a solution in the TestStreamEnvironment
class, ie, as used by WindowingIntegrationTest. However, this class
appears to be packaged in the flink-streaming-core test artifact,
which is not published to maven.

For reference, this is the maven dependency stanza I'm using. Please
let me know if I've got it wrong.

Thanks,
Nick

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <classifier>test</classifier>
      <scope>test</scope>
    </dependency>

Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:

> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Stephan Ewen
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.

I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n

On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;ndimiduk@gmail.com&#39;);" target="_blank">ndimiduk@...> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;rmetzger@apache.org&#39;);" target="_blank">rmetzger@...> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;ndimiduk@gmail.com&#39;);" target="_blank">ndimiduk@...> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Till Rohrmann

Hi Nick,

I think a flatMap operation which is instantiated with your list of predicates should do the job. Thus, there shouldn’t be a need to dig deeper than the DataStream for the first version.

Cheers,
Till


On Fri, Nov 6, 2015 at 3:58 AM, Nick Dimiduk <[hidden email]> wrote:
Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.

I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n


On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
Promising observation, Till. Is it possible to access Table API's
select and where operators from within such a flatMap?

-n

On Fri, Nov 6, 2015 at 6:19 AM, Till Rohrmann <[hidden email]> wrote:

> Hi Nick,
>
> I think a flatMap operation which is instantiated with your list of
> predicates should do the job. Thus, there shouldn’t be a need to dig deeper
> than the DataStream for the first version.
>
> Cheers,
> Till
>
>
> On Fri, Nov 6, 2015 at 3:58 AM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Thanks Stephan, I'll check that out in the morning. Generally speaking, it
>> would be great to have some single-jvm example tests for those of us getting
>> started. Following the example of WindowingIntegrationTest is mostly
>> working, though reusing my single sink instance with its static collection
>> results in non-deterministic results; there appears to be a race between
>> instances clearing the collection in their open method and the runtime
>> returning the collection to my test harness.
>>
>> I'd also appreciate some guidance on stream composition. It's nice to use
>> the fluent API when exploring data in a shell, but it seems to me like that
>> API is cumbersome when composing data pipelines of reusable partials. Or
>> maybe I'm doing it all wrong... Hence the request for more examples :)
>>
>> While I'm asking, how might you model this: I have a set of predicates I'd
>> like to flatMap over a stream. An input item should be compared vs every
>> predicate (basically, I want a Clojure juxt of predicates over each stream
>> element). Imagine those predicates expressed as where clauses via the Table
>> API. Say I have hundreds of thousands of these predicates to run over every
>> stream event. Is the java client API rich enough to express such a flow, or
>> should I examine something lower than DataStream?
>>
>> Thanks a lot, and sorry for all the newb questions.
>> -n
>>
>>
>> On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
>>>
>>> Hey!
>>>
>>> There is also a collect() sink in the "flink-streaming-contrib" project,
>>> see here:
>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>>
>>> It should work well locally for testing. In that case you can write a
>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>>> stop reading it once you know the stream is exhausted...
>>>
>>> Stephan
>>>
>>>
>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
>>>>
>>>> Hi Robert,
>>>>
>>>> It seems "type" was what I needed. This it also looks like the test
>>>> jar has an undeclared dependency. In the end, the following allowed me
>>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>>>
>>>> -n
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.flink</groupId>
>>>>       <artifactId>flink-streaming-core</artifactId>
>>>>       <version>${flink.version}</version>
>>>>       <type>test-jar</type>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>     <dependency>
>>>>       <groupId>org.apache.flink</groupId>
>>>>       <artifactId>flink-test-utils</artifactId>
>>>>       <version>${flink.version}</version>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>
>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]>
>>>> wrote:
>>>> > Hi Nick,
>>>> >
>>>> > we are usually publishing the test  artifacts. Can you try and replace
>>>> > the
>>>> > <classifier> tag by <type>test-jar<type>:
>>>> >
>>>> > <dependency>
>>>> >    <groupId>org.apache.flink</groupId>
>>>> >    <artifactId>flink-streaming-core</artifactId>
>>>> >    <version>${flink.version}</version>
>>>> >    <type>test-jar</type>
>>>> >    <scope>test</scope>
>>>> > </dependency>
>>>> >
>>>> >
>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]>
>>>> > wrote:
>>>> >>
>>>> >> Hello,
>>>> >>
>>>> >> I'm attempting integration tests for my streaming flows. I'd like to
>>>> >> produce an input stream of java objects and sink the results into a
>>>> >> collection for verification via JUnit asserts.
>>>> >> StreamExecutionEnvironment provides methods for the former, however,
>>>> >> how to achieve the latter is not evident based on my internet
>>>> >> searching. I think I've found a solution in the TestStreamEnvironment
>>>> >> class, ie, as used by WindowingIntegrationTest. However, this class
>>>> >> appears to be packaged in the flink-streaming-core test artifact,
>>>> >> which is not published to maven.
>>>> >>
>>>> >> For reference, this is the maven dependency stanza I'm using. Please
>>>> >> let me know if I've got it wrong.
>>>> >>
>>>> >> Thanks,
>>>> >> Nick
>>>> >>
>>>> >>     <dependency>
>>>> >>       <groupId>org.apache.flink</groupId>
>>>> >>       <artifactId>flink-streaming-core</artifactId>
>>>> >>       <version>${flink.version}</version>
>>>> >>       <classifier>test</classifier>
>>>> >>       <scope>test</scope>
>>>> >>     </dependency>
>>>> >
>>>> >
>>>
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Till Rohrmann

No that is not possible since you cannot access DataSets from inside UDFs. And select and where operations are translated into a filter operation on a DataSet.


On Fri, Nov 6, 2015 at 6:03 PM, Nick Dimiduk <[hidden email]> wrote:
Promising observation, Till. Is it possible to access Table API's
select and where operators from within such a flatMap?

-n

On Fri, Nov 6, 2015 at 6:19 AM, Till Rohrmann <[hidden email]> wrote:
> Hi Nick,
>
> I think a flatMap operation which is instantiated with your list of
> predicates should do the job. Thus, there shouldn’t be a need to dig deeper
> than the DataStream for the first version.
>
> Cheers,
> Till
>
>
> On Fri, Nov 6, 2015 at 3:58 AM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Thanks Stephan, I'll check that out in the morning. Generally speaking, it
>> would be great to have some single-jvm example tests for those of us getting
>> started. Following the example of WindowingIntegrationTest is mostly
>> working, though reusing my single sink instance with its static collection
>> results in non-deterministic results; there appears to be a race between
>> instances clearing the collection in their open method and the runtime
>> returning the collection to my test harness.
>>
>> I'd also appreciate some guidance on stream composition. It's nice to use
>> the fluent API when exploring data in a shell, but it seems to me like that
>> API is cumbersome when composing data pipelines of reusable partials. Or
>> maybe I'm doing it all wrong... Hence the request for more examples :)
>>
>> While I'm asking, how might you model this: I have a set of predicates I'd
>> like to flatMap over a stream. An input item should be compared vs every
>> predicate (basically, I want a Clojure juxt of predicates over each stream
>> element). Imagine those predicates expressed as where clauses via the Table
>> API. Say I have hundreds of thousands of these predicates to run over every
>> stream event. Is the java client API rich enough to express such a flow, or
>> should I examine something lower than DataStream?
>>
>> Thanks a lot, and sorry for all the newb questions.
>> -n
>>
>>
>> On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
>>>
>>> Hey!
>>>
>>> There is also a collect() sink in the "flink-streaming-contrib" project,
>>> see here:
>>> https://github.com/apache/flink/blob/master/flink-contrib/flink-streaming-contrib/src/main/java/org/apache/flink/contrib/streaming/DataStreamUtils.java
>>>
>>> It should work well locally for testing. In that case you can write a
>>> program as usual an use "DataStreamUtils.collect(stream)", so you need to
>>> stop reading it once you know the stream is exhausted...
>>>
>>> Stephan
>>>
>>>
>>> On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
>>>>
>>>> Hi Robert,
>>>>
>>>> It seems "type" was what I needed. This it also looks like the test
>>>> jar has an undeclared dependency. In the end, the following allowed me
>>>> to use TestStreamEnvironment for my integration test. Thanks a lot!
>>>>
>>>> -n
>>>>
>>>>     <dependency>
>>>>       <groupId>org.apache.flink</groupId>
>>>>       <artifactId>flink-streaming-core</artifactId>
>>>>       <version>${flink.version}</version>
>>>>       <type>test-jar</type>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>     <dependency>
>>>>       <groupId>org.apache.flink</groupId>
>>>>       <artifactId>flink-test-utils</artifactId>
>>>>       <version>${flink.version}</version>
>>>>       <scope>test</scope>
>>>>     </dependency>
>>>>
>>>> On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]>
>>>> wrote:
>>>> > Hi Nick,
>>>> >
>>>> > we are usually publishing the test  artifacts. Can you try and replace
>>>> > the
>>>> > <classifier> tag by <type>test-jar<type>:
>>>> >
>>>> > <dependency>
>>>> >    <groupId>org.apache.flink</groupId>
>>>> >    <artifactId>flink-streaming-core</artifactId>
>>>> >    <version>${flink.version}</version>
>>>> >    <type>test-jar</type>
>>>> >    <scope>test</scope>
>>>> > </dependency>
>>>> >
>>>> >
>>>> > On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]>
>>>> > wrote:
>>>> >>
>>>> >> Hello,
>>>> >>
>>>> >> I'm attempting integration tests for my streaming flows. I'd like to
>>>> >> produce an input stream of java objects and sink the results into a
>>>> >> collection for verification via JUnit asserts.
>>>> >> StreamExecutionEnvironment provides methods for the former, however,
>>>> >> how to achieve the latter is not evident based on my internet
>>>> >> searching. I think I've found a solution in the TestStreamEnvironment
>>>> >> class, ie, as used by WindowingIntegrationTest. However, this class
>>>> >> appears to be packaged in the flink-streaming-core test artifact,
>>>> >> which is not published to maven.
>>>> >>
>>>> >> For reference, this is the maven dependency stanza I'm using. Please
>>>> >> let me know if I've got it wrong.
>>>> >>
>>>> >> Thanks,
>>>> >> Nick
>>>> >>
>>>> >>     <dependency>
>>>> >>       <groupId>org.apache.flink</groupId>
>>>> >>       <artifactId>flink-streaming-core</artifactId>
>>>> >>       <version>${flink.version}</version>
>>>> >>       <classifier>test</classifier>
>>>> >>       <scope>test</scope>
>>>> >>     </dependency>
>>>> >
>>>> >
>>>
>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
In reply to this post by Nick Dimiduk
On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <[hidden email]> wrote:
Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.

This inconsistent test result is pretty frustrating. I've created a sample project with an IT that demonstrates the issue. Run `mvn test` multiple times and see that sometimes it passes and sometimes it fails. Maybe someone has some thoughts?


Thanks,
Nick

I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n


On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Stephan Ewen
There is no global order in parallel streams, it is something that applications need to work with. We are thinking about adding operations to introduce event-time order (at the cost of some delay), but that is only plans at this point.


What I do in my tests is run the test streams in parallel, but the Sink in DOP 1. The sink gathers the elements in a list, and the close() function validates the result.

Validating the results may involve sorting the list where elements where gathered (make the order deterministic) or use a hash set if it is only about distinct count.

Hope that helps.

On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <[hidden email]> wrote:
On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <[hidden email]> wrote:
Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.

This inconsistent test result is pretty frustrating. I've created a sample project with an IT that demonstrates the issue. Run `mvn test` multiple times and see that sometimes it passes and sometimes it fails. Maybe someone has some thoughts?


Thanks,
Nick

I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n


On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
Sorry Stephan but I don't follow how global order applies in my case. I'm merely checking the size of the sink results. I assume all tuples from a given test invitation have sunk before the next test begins, which is clearly not the case. Is there a way I can place a barrier in my tests to ensure one streaming DAG runs at a time, and that all buffers have been flushed to the sink before the next test begins?

What is "sink in DOP 1"?

Thanks,
Nick

On Wednesday, November 18, 2015, Stephan Ewen <[hidden email]> wrote:
There is no global order in parallel streams, it is something that applications need to work with. We are thinking about adding operations to introduce event-time order (at the cost of some delay), but that is only plans at this point.


What I do in my tests is run the test streams in parallel, but the Sink in DOP 1. The sink gathers the elements in a list, and the close() function validates the result.

Validating the results may involve sorting the list where elements where gathered (make the order deterministic) or use a hash set if it is only about distinct count.

Hope that helps.

On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;ndimiduk@gmail.com&#39;);" target="_blank">ndimiduk@...> wrote:
On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;ndimiduk@gmail.com&#39;);" target="_blank">ndimiduk@...> wrote:
Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.

This inconsistent test result is pretty frustrating. I've created a sample project with an IT that demonstrates the issue. Run `mvn test` multiple times and see that sometimes it passes and sometimes it fails. Maybe someone has some thoughts?


Thanks,
Nick

I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n


On Thursday, November 5, 2015, Stephan Ewen <<a href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;sewen@apache.org&#39;);" target="_blank">sewen@...> wrote:
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Stephan Ewen
Okay, I think I misunderstood your problem.

Usually you can simply execute tests one after another by waiting until "env.execute()" returns. The streaming jobs terminate by themselves once the sources reach end of stream (finite streams are supported that way) but make sure all records flow through the entire stream (no barrier needed). So if you use a source that reaches an end, no extra work is needed.

If you have a proper infinite source (like Kafka), things are a bit more tricky, since you have a proper infinite streaming program. What we do in our Kafka Tests is throw a "SuccessException" in the sink once we saw all data we expected. You can get the cause exceptions in a try/catch around env.execute() to check if the program "failed" with a SuccessException, or whether it failed proper.

A "sink in DOP 1" (sorry for the unclear terminology) is a sink with parallelism 1, so all data is collected by the same function instance.

Any of this helpful?

Stephan


On Wed, Nov 18, 2015 at 5:13 PM, Nick Dimiduk <[hidden email]> wrote:
Sorry Stephan but I don't follow how global order applies in my case. I'm merely checking the size of the sink results. I assume all tuples from a given test invitation have sunk before the next test begins, which is clearly not the case. Is there a way I can place a barrier in my tests to ensure one streaming DAG runs at a time, and that all buffers have been flushed to the sink before the next test begins?

What is "sink in DOP 1"?

Thanks,
Nick


On Wednesday, November 18, 2015, Stephan Ewen <[hidden email]> wrote:
There is no global order in parallel streams, it is something that applications need to work with. We are thinking about adding operations to introduce event-time order (at the cost of some delay), but that is only plans at this point.


What I do in my tests is run the test streams in parallel, but the Sink in DOP 1. The sink gathers the elements in a list, and the close() function validates the result.

Validating the results may involve sorting the list where elements where gathered (make the order deterministic) or use a hash set if it is only about distinct count.

Hope that helps.

On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <[hidden email]> wrote:
On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <[hidden email]> wrote:
Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.

This inconsistent test result is pretty frustrating. I've created a sample project with an IT that demonstrates the issue. Run `mvn test` multiple times and see that sometimes it passes and sometimes it fails. Maybe someone has some thoughts?


Thanks,
Nick

I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n


On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>




Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
Please see the above gist: my test makes no assertions until after the env.execute() call. Adding setParallelism(1) to my sink appears to stabilize my test. Indeed, very helpful. Thanks a lot!

-n

On Wed, Nov 18, 2015 at 9:15 AM, Stephan Ewen <[hidden email]> wrote:
Okay, I think I misunderstood your problem.

Usually you can simply execute tests one after another by waiting until "env.execute()" returns. The streaming jobs terminate by themselves once the sources reach end of stream (finite streams are supported that way) but make sure all records flow through the entire stream (no barrier needed). So if you use a source that reaches an end, no extra work is needed.

If you have a proper infinite source (like Kafka), things are a bit more tricky, since you have a proper infinite streaming program. What we do in our Kafka Tests is throw a "SuccessException" in the sink once we saw all data we expected. You can get the cause exceptions in a try/catch around env.execute() to check if the program "failed" with a SuccessException, or whether it failed proper.

A "sink in DOP 1" (sorry for the unclear terminology) is a sink with parallelism 1, so all data is collected by the same function instance.

Any of this helpful?

Stephan


On Wed, Nov 18, 2015 at 5:13 PM, Nick Dimiduk <[hidden email]> wrote:
Sorry Stephan but I don't follow how global order applies in my case. I'm merely checking the size of the sink results. I assume all tuples from a given test invitation have sunk before the next test begins, which is clearly not the case. Is there a way I can place a barrier in my tests to ensure one streaming DAG runs at a time, and that all buffers have been flushed to the sink before the next test begins?

What is "sink in DOP 1"?

Thanks,
Nick


On Wednesday, November 18, 2015, Stephan Ewen <[hidden email]> wrote:
There is no global order in parallel streams, it is something that applications need to work with. We are thinking about adding operations to introduce event-time order (at the cost of some delay), but that is only plans at this point.


What I do in my tests is run the test streams in parallel, but the Sink in DOP 1. The sink gathers the elements in a list, and the close() function validates the result.

Validating the results may involve sorting the list where elements where gathered (make the order deterministic) or use a hash set if it is only about distinct count.

Hope that helps.

On Thu, Nov 12, 2015 at 8:24 PM, Nick Dimiduk <[hidden email]> wrote:
On Thu, Nov 5, 2015 at 6:58 PM, Nick Dimiduk <[hidden email]> wrote:
Thanks Stephan, I'll check that out in the morning. Generally speaking, it would be great to have some single-jvm example tests for those of us getting started. Following the example of WindowingIntegrationTest is mostly working, though reusing my single sink instance with its static collection results in non-deterministic results; there appears to be a race between instances clearing the collection in their open method and the runtime returning the collection to my test harness.

This inconsistent test result is pretty frustrating. I've created a sample project with an IT that demonstrates the issue. Run `mvn test` multiple times and see that sometimes it passes and sometimes it fails. Maybe someone has some thoughts?


Thanks,
Nick

I'd also appreciate some guidance on stream composition. It's nice to use the fluent API when exploring data in a shell, but it seems to me like that API is cumbersome when composing data pipelines of reusable partials. Or maybe I'm doing it all wrong... Hence the request for more examples :)

While I'm asking, how might you model this: I have a set of predicates I'd like to flatMap over a stream. An input item should be compared vs every predicate (basically, I want a Clojure juxt of predicates over each stream element). Imagine those predicates expressed as where clauses via the Table API. Say I have hundreds of thousands of these predicates to run over every stream event. Is the java client API rich enough to express such a flow, or should I examine something lower than DataStream?

Thanks a lot, and sorry for all the newb questions.
-n


On Thursday, November 5, 2015, Stephan Ewen <[hidden email]> wrote:
Hey!


It should work well locally for testing. In that case you can write a program as usual an use "DataStreamUtils.collect(stream)", so you need to stop reading it once you know the stream is exhausted...

Stephan


On Thu, Nov 5, 2015 at 10:38 PM, Nick Dimiduk <[hidden email]> wrote:
Hi Robert,

It seems "type" was what I needed. This it also looks like the test
jar has an undeclared dependency. In the end, the following allowed me
to use TestStreamEnvironment for my integration test. Thanks a lot!

-n

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-core</artifactId>
      <version>${flink.version}</version>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
    </dependency>

On Thu, Nov 5, 2015 at 12:47 PM, Robert Metzger <[hidden email]> wrote:
> Hi Nick,
>
> we are usually publishing the test  artifacts. Can you try and replace the
> <classifier> tag by <type>test-jar<type>:
>
> <dependency>
>    <groupId>org.apache.flink</groupId>
>    <artifactId>flink-streaming-core</artifactId>
>    <version>${flink.version}</version>
>    <type>test-jar</type>
>    <scope>test</scope>
> </dependency>
>
>
> On Thu, Nov 5, 2015 at 8:18 PM, Nick Dimiduk <[hidden email]> wrote:
>>
>> Hello,
>>
>> I'm attempting integration tests for my streaming flows. I'd like to
>> produce an input stream of java objects and sink the results into a
>> collection for verification via JUnit asserts.
>> StreamExecutionEnvironment provides methods for the former, however,
>> how to achieve the latter is not evident based on my internet
>> searching. I think I've found a solution in the TestStreamEnvironment
>> class, ie, as used by WindowingIntegrationTest. However, this class
>> appears to be packaged in the flink-streaming-core test artifact,
>> which is not published to maven.
>>
>> For reference, this is the maven dependency stanza I'm using. Please
>> let me know if I've got it wrong.
>>
>> Thanks,
>> Nick
>>
>>     <dependency>
>>       <groupId>org.apache.flink</groupId>
>>       <artifactId>flink-streaming-core</artifactId>
>>       <version>${flink.version}</version>
>>       <classifier>test</classifier>
>>       <scope>test</scope>
>>     </dependency>
>
>





Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

lofifnc
Hi,

I'm currently working on improving the testing process of flink streaming applications.

I have written a test runtime that takes care of execution, collecting the output and applying a verifier to it. The runtime is able to provide test sources and sinks that run in parallel.
On top of that it offers an API for defining JUnit tests. I have recreated your test using my API:
https://gist.github.com/anonymous/8825ea83e8fe9afba19e
I have a second example that shows some of the more advanced features of the API and runtime:
https://gist.github.com/anonymous/fa5c77becae2e37d28eb

Unfortunately It's still a work in progress. But I'm planning to have a first release candidate at the end of the month.

Best, Alex
Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
Very interesting Alex!

One other thing I find useful in building data flows is using "builder" functions that hide the details of wiring up specific plumbing on generic input parameters. For instance a void wireFoo(DataSource source, SinkFunction sink) { ... }. It would be great to have test tools that allow working with this kind of composition. For instance, I find invoking setParallelism on the result of registering a SinkFunction on a flow is not very convenient with the current APIs. In this case, I would want to set the parallelism to something different in test than I'd like in production.

Any thoughts on how we might make testing compositions like this easier?

-n

On Thu, Nov 19, 2015 at 2:09 AM, lofifnc <[hidden email]> wrote:
Hi,

I'm currently working on improving the testing process of flink streaming
applications.

I have written a test runtime that takes care of execution, collecting the
output and applying a verifier to it. The runtime is able to provide test
sources and sinks that run in parallel.
On top of that it offers an API for defining JUnit tests. I have recreated
your test using my API:
https://gist.github.com/anonymous/8825ea83e8fe9afba19e
I have a second example that shows some of the more advanced features of the
API and runtime:
https://gist.github.com/anonymous/fa5c77becae2e37d28eb

Unfortunately It's still a work in progress. But I'm planning to have a
first release candidate at the end of the month.

Best, Alex




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Published-test-artifacts-for-flink-streaming-tp3379p3584.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

lofifnc
Hi Nick,

This is easily achievable using the framework I provide. createDataStream(Input<T> input) does actually return a DataStreamSource<T>.
And the call of assertStream(DataStream<T> datastream, OutputMatcher<T> matcher) just attaches a TestSink<T> to the datastream, but you can create the test sink manually with: new TestSink<T>(OutputMatcher<T> matcher). So your able to test a builder function with wireFoo(testSource,testSink).

I personally compose my data flows into smaller functions KeyedDataStream sessionize(DataStream stream) so I'm able to test my pipelines partially. But if I wan't to adjust a setting inside these functions differently for test than for production if have to add it to the parameters of my function: KeyedDataStream sessionize(DataStream stream, Long sessionTimeout). Another solution would be to use Flinks ParameterTool and use a different configuration for test and production.

Best, Alex.  

 
Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

Nick Dimiduk
Hi Alex,

How's your infra coming along? I'd love to up my unit testing game with your improvements :)

-n

On Mon, Nov 23, 2015 at 12:20 AM, lofifnc <[hidden email]> wrote:
Hi Nick,

This is easily achievable using the framework I provide.
createDataStream(Input<T> input) does actually return a DataStreamSource<T>.
And the call of assertStream(DataStream<T> datastream, OutputMatcher<T>
matcher) just attaches a TestSink<T> to the datastream, but you can create
the test sink manually with: new TestSink<T>(OutputMatcher<T> matcher). So
your able to test a builder function with wireFoo(testSource,testSink).

I personally compose my data flows into smaller functions KeyedDataStream
sessionize(DataStream stream) so I'm able to test my pipelines partially.
But if I wan't to adjust a setting inside these functions differently for
test than for production if have to add it to the parameters of my function:
KeyedDataStream sessionize(DataStream stream, Long sessionTimeout). Another
solution would be to use Flinks ParameterTool and use a different
configuration for test and production.

Best, Alex.






--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Published-test-artifacts-for-flink-streaming-tp3379p3638.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Published test artifacts for flink streaming

lofifnc
Hi,

If you wan't to play with it you can find the source and basic documentation here: https://github.com/ottogroup/flink-spector.
The framework is for now feature complete. At the moment I'm working on exposing some more functionality to the user, making the dsl more intuitive and scalatest support. As soon as I've finished the first two points and made the core compatible with scala I will release maven artifacts.

Best Alex!