In this answer Optimizing Flink transformation
( symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
val su = xy.reduceGroup { in ⇒
val invec = in.toVector
val x = invec map (_._2)
val y = invec map (_._1)
val mu = mutualInformation(x, y)
val Hx = entropy(x)
val Hy = entropy(y)
2 * mu / (Hx + Hy)
I wrote a function to compute Symmetrical Uncertainty with ReduceGroup.
But its being slow on big data sets.
I've read about Combinable GroupReduceFunctions on Flink's
documentation, and I am tring to write a GroupReduceFunction to compute
Symmetrical Uncertainty:
class MyCombinableGroupReducer
extends GroupReduceFunction[(Double, Double), Double]
with GroupCombineFunction[(Double, Double), (Double, Double)]{
override def reduce(
in: Iterable[(Double, Double)],
out: Collector[Double]): Unit =
val x = in map(_._2)
val y = in map(_._1)
// collect...
override def combine(
in: Iterable[(Double, Double)],
out: Collector[(Double, Double)]): Unit =
// ...
It is possible to compute this measure in parallel? I do not know what
should I wrote on reduce and combine functions.