DistributedMatrix in Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

DistributedMatrix in Flink

Lydia Ickler
Hi all,

as mentioned before I am trying to import the RowMatrix from Spark to Flink…

In the code I already ran into a dead end… In the function multiplyGramianMatrixBy() (see end of mail) there is the line: 
rows.context.broadcast(v) (rows is a DataSet[Vector]
What exactly is this line doing? Does it fill the „content“ of v into the variable rows?
And another question:
What is the function treeAggregate doing ? And how would you tackle a „copy“ of that in Flink?

Thanks in advance!
Best regards, 
Lydia


private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): DenseVector[Double] = {
val n = numCols().toInt

val vbr = rows.context.broadcast(v)

rows.treeAggregate(BDV.zeros[Double](n))(
seqOp = (U, r) => {
val rBrz = r.toBreeze
val a = rBrz.dot(vbr.data)
rBrz match {
// use specialized axpy for better performance
case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
case _ => throw new UnsupportedOperationException(
s"Do not support vector operation from type ${rBrz.getClass.getName}.")
}
U
}, combOp = (U1, U2) => U1 += U2)
}

Reply | Threaded
Open this post in threaded view
|

Re: DistributedMatrix in Flink

Till Rohrmann-2

Hi Lydia,

Spark and Flink are not identical. Thus, you’ll concepts in both system which won’t have a corresponding counter part in the other system. For example, rows.context.broadcast(v) broadcasts the value v so that you can use it on all Executors. Flink follows a slightly different concept when you broadcast values. In Flink you’ll always broadcast the contents of DataSets. That way you avoid to collect the result on some central node from which it is then broadcasted.

The treeAggregate is an aggregation operation which is partly executed on the cluster. It is similar to a combinable reduce operation in Flink. However, you can choose an arbitrary result type (similar to a fold operation compared to a reduce operation). You can do the same with Flink if you first apply a combineGroup function on the DataSet and then a reduce function.

Cheers,
Till


On Thu, Feb 4, 2016 at 3:13 PM, Lydia Ickler <[hidden email]> wrote:
Hi all,

as mentioned before I am trying to import the RowMatrix from Spark to Flink…

In the code I already ran into a dead end… In the function multiplyGramianMatrixBy() (see end of mail) there is the line: 
rows.context.broadcast(v) (rows is a DataSet[Vector]
What exactly is this line doing? Does it fill the „content“ of v into the variable rows?
And another question:
What is the function treeAggregate doing ? And how would you tackle a „copy“ of that in Flink?

Thanks in advance!
Best regards, 
Lydia


private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): DenseVector[Double] = {
val n = numCols().toInt

val vbr = rows.context.broadcast(v)

rows.treeAggregate(BDV.zeros[Double](n))(
seqOp = (U, r) => {
val rBrz = r.toBreeze
val a = rBrz.dot(vbr.data)
rBrz match {
// use specialized axpy for better performance
case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
case _ => throw new UnsupportedOperationException(
s"Do not support vector operation from type ${rBrz.getClass.getName}.")
}
U
}, combOp = (U1, U2) => U1 += U2)
}