Hello,
I would like to write a test code for my Flink job. Looking at flink-examples, I thought the way will be: - Create test class which extends StreamingMultipleProgramsTestBase - In each method, just write streaming job as usual, but use collection data source and iterator sink - Use TestBaseUtils.compareResultXX method to check the result. Here is the actual code I wrote. --- class SampleTestCase extends StreamingMultipleProgramsTestBase { @Test def testCase1(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val stream = env.fromElements("aaa","bbb","aaa") .map{ x => (x,1)} .keyBy(0) .timeWindow(Time.seconds(1)) .sum(1) env.execute() val result = DataStreamUtils.collect(stream.javaStream) TestBaseUtils.compareResultAsText(Lists.newArrayList(result),"(aaa,2)\n(bbb,1)") } } --- But when I ran the test. I got this error: java.lang.AssertionError: Wrong number of elements result expected:<2> but was:<0> It looks like test finishes before the end of the timeWindow, but I do not know how to fix it. Any advise would be appreciated. Thanks, Hironori Ogibayashi |
Hi,
using DataStreamUtils.collect() in a test is difficult due to synchronization problems, as you discovered yourself. What I propose is to write a custom Sink that collects data and verifies the results. Verification should both happen in the invoke() method and in close(). For the sink, you should set the parallelism to 1 to ensure that all data goes to one sink. Another option is to use https://github.com/ottogroup/flink-spector which provides good ways of specifying expected outputs. Maybe Alex has something else to say about it, I'm looping him hin. Cheers, Aljoscha On Fri, 27 May 2016 at 09:33 Hironori Ogibayashi <[hidden email]> wrote: Hello, |
Hi,
Flinkspector is indeed a good choice to circumvent this problem as it specifically has several mechanisms to deal with these synchronization problems. Unfortunately, I'm still looking for a reasonable solution to support checking of scala types. Maybe I will provide a version in which you can use all the functionality except the custom validation logic. This could be done relatively quickly. Cheers, Alex |
Thank you for your response.
flink-spector looks really nice. I tried but got some errors regarding types, maybe because of the thing Alex mentioned. I am looking forward to the new version. Thanks, Hironori. 2016-05-30 16:45 GMT+09:00 lofifnc <[hidden email]>: > Hi, > > Flinkspector is indeed a good choice to circumvent this problem as it > specifically has several mechanisms to deal with these synchronization > problems. Unfortunately, I'm still looking for a reasonable solution to > support checking of scala types. > Maybe I will provide a version in which you can use all the functionality > except the custom validation logic. This could be done relatively quickly. > > Cheers, > Alex > > > > > > > > > > -- > View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Writing-test-for-Flink-streaming-jobs-tp7213p7247.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. |
Free forum by Nabble | Edit this page |