Simple streaming word count doesn't work (Scala)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Simple streaming word count doesn't work (Scala)

Francis Aranda

Testing the apache flink stream API, I found something weird with a simple example.

This code counts the words every 5 seconds under a window of 10 seconds. Until the 10 first seconds, counts sound good, after that, every print shows a wrong count - one per word. There is something wrong in my code? 


Thanks in advance!

def generateWords(ctx: SourceContext[String]) = {
  val words = List("amigo", "brazo", "pelo")
  while (true) {
    Thread.sleep(300)
    ctx.collect(words(Random.nextInt(words.length)))
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

val stream = env.addSource(generateWords _)
val windowedStream = stream.map((_, 1))
  .window(Time of(10, SECONDS)).every(Time of(5, SECONDS))


val wordCount = windowedStream
  .groupBy("_1")
  .sum("_2")

wordCount
 .getDiscretizedStream()
  .print()

env.execute("sum randoms")

The output is:

[(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds
[(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds
[(brazo,1)] 
[(amigo,1)]

Reply | Threaded
Open this post in threaded view
|

Re: Simple streaming word count doesn't work (Scala)

Aljoscha Krettek
Hi Francis,
I'm afraid this is a very strange bug that results from the interplay between pre-aggregating (an optimization that pre-aggregates the elements of a window as they arrive) and the window size/slide size you use. When using some other time values it works, but with other it doesn't, again.

I'm sorry you ran into this problem. We are currently reworking the windowing logic. So in the next release (0.10) this should be rock solid.

Cheers,
Aljoscha

On Mon, 14 Sep 2015 at 09:17 Francis Aranda <[hidden email]> wrote:

Testing the apache flink stream API, I found something weird with a simple example.

This code counts the words every 5 seconds under a window of 10 seconds. Until the 10 first seconds, counts sound good, after that, every print shows a wrong count - one per word. There is something wrong in my code? 


Thanks in advance!

def generateWords(ctx: SourceContext[String]) = {
  val words = List("amigo", "brazo", "pelo")
  while (true) {
    Thread.sleep(300)
    ctx.collect(words(Random.nextInt(words.length)))
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

val stream = env.addSource(generateWords _)
val windowedStream = stream.map((_, 1))
  .window(Time of(10, SECONDS)).every(Time of(5, SECONDS))


val wordCount = windowedStream
  .groupBy("_1")
  .sum("_2")

wordCount
 .getDiscretizedStream()
  .print()

env.execute("sum randoms")

The output is:

[(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds
[(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds
[(brazo,1)] 
[(amigo,1)]