Hi team,
Recently I am trying to explore the new features of Flink 1.12 with Batch Execution. I locally wrote a classic WordCount program to read from text file and count the words (almost same as the one in Flink Github repo https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala), and after reading https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html I added `env.setRuntimeMode(RuntimeExecutionMode.BATCH);` after declare the "env" to make it execute under BATCH mode. After running the code, the printed results showed only final count results instead of incremental results, which is expected. But I also notice, all the words that only appear once have NOT been printed out. I have tried different things like wrap the word in a case class etc, and read more details and see if I have missed anything but still not able to figure out (And I have tried the default examples come with the Flink package and got same results, and with using DataSet API I do not see this issue). Is there anything extra user need to specify or notice when using BATCH execution mode in datastream API with Flink 1.12 or this is kind of a bug please? The flink version I used is 1.12 with scala 2.11 (also tried java 1.8 and observed same issue) Please let me know if you need other info to help diagnose. Thank you very much! Bests, Derek Sheng |
I did a little experiment, and I was able to reproduce this if I use the sum aggregator on KeyedStream to do the counting. However, if I implement my own counting in a KeyedProcessFunction, or if I use the Table API, I get correct results with RuntimeExecutionMode.BATCH -- though the results are produced incrementally, as they would be in streaming mode. In FLIP-134: Batch execution for the DataStream API [1] it was decided to deprecate these relational methods -- such as sum -- on KeyedStream. But I don't know if this means this behavior is to be expected, or not. Best, David On Wed, Dec 23, 2020 at 8:22 PM Derek Sheng <[hidden email]> wrote:
|
Thanks for reporting this! This is not the expected behaviour, I created
a Jira Issue: https://issues.apache.org/jira/browse/FLINK-20764. Best, Aljoscha On 23.12.20 22:26, David Anderson wrote: > I did a little experiment, and I was able to reproduce this if I use the > sum aggregator on KeyedStream to do the counting. > > However, if I implement my own counting in a KeyedProcessFunction, or if I > use the Table API, I get correct results with RuntimeExecutionMode.BATCH -- > though the results are produced incrementally, as they would be in > streaming mode. > > In FLIP-134: Batch execution for the DataStream API [1] it was decided to > deprecate these relational methods -- such as sum -- on KeyedStream. But I > don't know if this means this behavior is to be expected, or not. > > I've cc'ed @Aljoscha Krettek <[hidden email]>, who should be able to > shed some light on this. > > Best, > David > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API > > On Wed, Dec 23, 2020 at 8:22 PM Derek Sheng <[hidden email]> > wrote: > >> Hi team, >> >> Recently I am trying to explore the new features of Flink 1.12 with Batch >> Execution. >> >> I locally wrote a classic WordCount program to read from text file and >> count the words (almost same as the one in Flink Github repo >> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala), >> and after reading >> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html >> I added `env.setRuntimeMode(RuntimeExecutionMode.BATCH);` after declare the >> "env" to make it execute under BATCH mode. After running the code, the >> printed results showed only final count results instead of incremental >> results, which is expected. *But I also notice, all the words that only >> appear once have NOT been printed out*. I have tried different things >> like wrap the word in a case class etc, and read more details and see if I >> have missed anything but still not able to figure out (And I have tried the >> default examples come with the Flink package and got same results, and with >> using DataSet API I do not see this issue). >> >> Is there anything extra user need to specify or notice when using BATCH >> execution mode in datastream API with Flink 1.12 or this is kind of a bug >> please? The flink version I used is 1.12 with scala 2.11 (also tried java >> 1.8 and observed same issue) >> >> Please let me know if you need other info to help diagnose. Thank you very >> much! >> >> Bests, >> >> Derek Sheng >> > |
Thank you both very much! Happy holidays! Aljoscha Krettek <[hidden email]> 于2020年12月24日周四 下午4:00写道: Thanks for reporting this! This is not the expected behaviour, I created |
Free forum by Nabble | Edit this page |