Understanding Kmeans in Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding Kmeans in Flink

Hajira Jabeen
Hello everyone,

I am trying to understand Kmeans in Flink, Scala.

I can see that the attached Kmeans-snippet (taken from Flink examples) updates centroids.

in (1) map function assigns points to centroids,
in (3) centroids are grouped by their ids.
in (4) the x and y coordinates are being added

But, I cannot understand what happens at (2) and then (5) ?
I will really appreciate, if any one can elaborate how this works ?


Thanks
Hajira


-------------------------------------
K means code snippet
--------------------------------------
val newCentroids = points
1)        .map(new SelectNearestCenter()).withBroadcastSet(currentCentroids, "centroids")
2)        .map { x => (x._1, x._2, 1L) }     
3)        .groupBy(0)                                 // by centroid ID
4)        .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }
5)        .map { x => new Centroid(x._1, x._2.div(x._3)) } 
      newCentroids
--------------------------------------



Reply | Threaded
Open this post in threaded view
|

Re: Understanding Kmeans in Flink

Márton Balassi
Hey Hajira,

Basically lines 2) to 5) determine the "mean" (centroid) of the new clusters that we have just defined by assigning the points in line 1). As calculating the mean is a non-associative function we break it down to two associative parts: summation and counting - which is followed by dividing the results of these parts to get the desired output. This is needed as Flink jobs run in a distributed environment.

Remember that line 1) outputs (Int, Point) tuples, so the result of 2) is (Int, Point, Double), where the third field is responsible to hold the count which is initialized to 1L. Line 4) Not only sums the Point coordinates, but does the same for the count (remember: summation and counting to get the mean). At line 5 all that is left to divide the sum of the points (which is still a point) by the number of points which leads to the new centroid.

Flink also comes with a Java implementation [1] of the same algorithm which comes with reasonably more boilerplate, but can definitely be helpful to understand the algorithm.

I hope this helps,

Marton


On Mon, Dec 28, 2015 at 5:21 PM, Hajira Jabeen <[hidden email]> wrote:
Hello everyone,

I am trying to understand Kmeans in Flink, Scala.

I can see that the attached Kmeans-snippet (taken from Flink examples) updates centroids.

in (1) map function assigns points to centroids,
in (3) centroids are grouped by their ids.
in (4) the x and y coordinates are being added

But, I cannot understand what happens at (2) and then (5) ?
I will really appreciate, if any one can elaborate how this works ?


Thanks
Hajira


-------------------------------------
K means code snippet
--------------------------------------
val newCentroids = points
1)        .map(new SelectNearestCenter()).withBroadcastSet(currentCentroids, "centroids")
2)        .map { x => (x._1, x._2, 1L) }     
3)        .groupBy(0)                                 // by centroid ID
4)        .reduce { (p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3) }
5)        .map { x => new Centroid(x._1, x._2.div(x._3)) } 
      newCentroids
--------------------------------------