Testing the apache flink stream API, I found something weird with a simple example.
This code counts the words every 5 seconds under a window of 10 seconds. Until the 10 first seconds, counts sound good, after that, every print shows a wrong count - one per word. There is something wrong in my code? 
Thanks in advance!
def generateWords(ctx: SourceContext[String]) = {
  val words = List("amigo", "brazo", "pelo")
  while (true) {
    Thread.sleep(300)
    ctx.collect(words(Random.nextInt(words.length)))
  }
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(generateWords _)
val windowedStream = stream.map((_, 1))
  .window(Time of(10, SECONDS)).every(Time of(5, SECONDS))
val wordCount = windowedStream
  .groupBy("_1")
  .sum("_2")
wordCount
 .getDiscretizedStream()
  .print()
env.execute("sum randoms")
The output is:
[(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds
[(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds
[(brazo,1)] 
[(amigo,1)]