Testing the apache flink
stream API, I found something weird with a simple example.
This code counts the words every 5 seconds under a window of 10 seconds. Until the 10 first seconds, counts sound good, after that, every print shows a wrong count - one per word. There is something wrong in my code?
Thanks in advance!
def generateWords(ctx: SourceContext[String]) = {
val words = List("amigo", "brazo", "pelo")
while (true) {
Thread.sleep(300)
ctx.collect(words(Random.nextInt(words.length)))
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(generateWords _)
val windowedStream = stream.map((_, 1))
.window(Time of(10, SECONDS)).every(Time of(5, SECONDS))
val wordCount = windowedStream
.groupBy("_1")
.sum("_2")
wordCount
.getDiscretizedStream()
.print()
env.execute("sum randoms")
The output is:
[(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds
[(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds
[(brazo,1)]
[(amigo,1)]