How to sort tuples in DataStream

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to sort tuples in DataStream

Saiph Kappa
Hi,

I'm trying to do a simple application in Flink Stream to count the top N words on a window-basis, but I don't know how to sort the words by their frequency in Flink.

In spark streaming, I would do something like this:
«
val isAscending = true
stream.reduceByKeyAndWindow(reduceFunc, Seconds(10), Seconds(10)).transform(_.sortByKey(isAscending)).map(_._2)
»

How can I do it in Flink Stream?

This is what I have so far:
«
val reduceFunc = (a: String, b: String) => {
  val aElems = a.split(Separator)
val bElems = b.split(Separator)
val result = a(params.aggParams.get.head.aggIndex).toInt + b(params.aggParams.get.head.aggIndex).toInt
result.toString
}
stream.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(10)).reduce(reduceFunc)
»

My stream is just a series of strings like "field1|field2|field3|..."

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: How to sort tuples in DataStream

Stephan Ewen
Hi!

Since a stream is infinite, you cannot simply sort it (Flink does not follow the mini batch model). You can only sort in windows.

I assume you key by word and sum up the counts.  Since you want to get the most frequent words, you would need to sort across keys, which you can do in a windowAll() function. Since you want a global sort, this will end up being a non-parallel step.

A more efficient variant is to have a bounded (N) max heap in the windowAll() function that you update with new elements and emit at the end. A fold() function should allow you to implement that.

BTW: It is probably also more efficient to parse the Strings into numbers once at the beginning of the program.

Stephan

On Mon, Jan 11, 2016 at 7:41 PM, Saiph Kappa <[hidden email]> wrote:
Hi,

I'm trying to do a simple application in Flink Stream to count the top N words on a window-basis, but I don't know how to sort the words by their frequency in Flink.

In spark streaming, I would do something like this:
«
val isAscending = true
stream.reduceByKeyAndWindow(reduceFunc, Seconds(10), Seconds(10)).transform(_.sortByKey(isAscending)).map(_._2)
»

How can I do it in Flink Stream?

This is what I have so far:
«
val reduceFunc = (a: String, b: String) => {
  val aElems = a.split(Separator)
val bElems = b.split(Separator)
val result = a(params.aggParams.get.head.aggIndex).toInt + b(params.aggParams.get.head.aggIndex).toInt
result.toString
}
stream.keyBy(0).timeWindow(Time.seconds(10), Time.seconds(10)).reduce(reduceFunc)
»

My stream is just a series of strings like "field1|field2|field3|..."

Thanks.