Hi,
I have test code ( https://github.com/nielsbasjes/yauaa/blob/v5.15/udfs/flink/src/test/java/nl/basjes/parse/useragent/flink/TestUserAgentAnalysisMapperInline.java#L140 ) that writes a DataStream to a List<> using LocalCollectionOutputFormat to verify if the pipeline did what it should do.
I was just now upgrading to Flink 1.10 and I found that apparently writeUsingOutputFormat has now been deprecated. The comment says ( https://github.com/apache/flink/blob/release-1.10.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1066 ): * @deprecated Please use the {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink} explicitly using the I'm not writing to a file at all. Looking at the API this StreamingFileSink does not seem to fit what I'm doing. What is in Flink 1.10 the correct way of writing a test to verify if the output of my test run is valid? -- Best regards / Met vriendelijke groeten, Niels Basjes |
Hi,
To collect the elements of a DataStream (usually only meant for testing purposes), you can take a look at `DataStreamUtils#collect(DataStream)`. Cheers, Gordon -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Gordon, Thanks. This works for me. I find it strange that when I do this it works (I made the differences bold)
how ever this does not work
and this also does not work
In both these cases it fails with
Did I do something wrong? Is this a bug in the DataStreamUtils ? Niels Basjes On Mon, Feb 17, 2020 at 8:56 AM Tzu-Li Tai <[hidden email]> wrote: Hi, Best regards / Met vriendelijke groeten,
Niels Basjes |
Hey Niels, This minimal Flink job executes in Flink 1.10: public static void main(String[] args) throws Exception {Maybe the TestUserAgentAnalysisMapperInline class is doing some magic that breaks with the StreamGraphGenerator? Best, Robert On Tue, Feb 18, 2020 at 9:59 AM Niels Basjes <[hidden email]> wrote:
|
I tried this in Flink 1.10.0 : @Test Results in
... On Fri, Feb 21, 2020 at 1:00 PM Robert Metzger <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
Hey, you are right. I'm also seeing this exception now. It was hidden in other log output. The stream graph is cleared on each execute(). That's why collect() and then execute() lead to the "no operators defined" error. However, if you have collect(), print(), execute(), then the print() is filling the stream graph again, and you are executing two Flink jobs: the collect job and the execute job. I hope I got it right this time :) Best, Robert On Fri, Feb 21, 2020 at 4:47 PM Niels Basjes <[hidden email]> wrote:
|
Yes that's it! now does this: DataStream<TestRecord> resultDataStream = ... List<TestRecord> result = new ArrayList<>(5); And as you explained because the 'collect' already does an execute this works like a charm. Niels On Sat, Feb 22, 2020 at 1:38 AM Robert Metzger <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
Free forum by Nabble | Edit this page |