Re: Issue in WordCount Example with DataStream API in BATCH RuntimeExecutionMode
Posted by
Derek Sheng on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Issue-in-WordCount-Example-with-DataStream-API-in-BATCH-RuntimeExecutionMode-tp40280p40306.html
Thank you both very much!
Happy holidays!
Thanks for reporting this! This is not the expected behaviour, I created
a Jira Issue: https://issues.apache.org/jira/browse/FLINK-20764.
Best,
Aljoscha
On 23.12.20 22:26, David Anderson wrote:
> I did a little experiment, and I was able to reproduce this if I use the
> sum aggregator on KeyedStream to do the counting.
>
> However, if I implement my own counting in a KeyedProcessFunction, or if I
> use the Table API, I get correct results with RuntimeExecutionMode.BATCH --
> though the results are produced incrementally, as they would be in
> streaming mode.
>
> In FLIP-134: Batch execution for the DataStream API [1] it was decided to
> deprecate these relational methods -- such as sum -- on KeyedStream. But I
> don't know if this means this behavior is to be expected, or not.
>
> I've cc'ed @Aljoscha Krettek <[hidden email]>, who should be able to
> shed some light on this.
>
> Best,
> David
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API
>
> On Wed, Dec 23, 2020 at 8:22 PM Derek Sheng <[hidden email]>
> wrote:
>
>> Hi team,
>>
>> Recently I am trying to explore the new features of Flink 1.12 with Batch
>> Execution.
>>
>> I locally wrote a classic WordCount program to read from text file and
>> count the words (almost same as the one in Flink Github repo
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala),
>> and after reading
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>> I added `env.setRuntimeMode(RuntimeExecutionMode.BATCH);` after declare the
>> "env" to make it execute under BATCH mode. After running the code, the
>> printed results showed only final count results instead of incremental
>> results, which is expected. *But I also notice, all the words that only
>> appear once have NOT been printed out*. I have tried different things
>> like wrap the word in a case class etc, and read more details and see if I
>> have missed anything but still not able to figure out (And I have tried the
>> default examples come with the Flink package and got same results, and with
>> using DataSet API I do not see this issue).
>>
>> Is there anything extra user need to specify or notice when using BATCH
>> execution mode in datastream API with Flink 1.12 or this is kind of a bug
>> please? The flink version I used is 1.12 with scala 2.11 (also tried java
>> 1.8 and observed same issue)
>>
>> Please let me know if you need other info to help diagnose. Thank you very
>> much!
>>
>> Bests,
>>
>> Derek Sheng
>>
>