Parameters inside an iteration?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Parameters inside an iteration?

Boden, Christoph

Dear Flink Community,


is there a compact and efficient way to get parameters that are know at run-time, but not compile-time inside an iteration? I tried the following:


>define an object with the parameters:


object  iterationVariables{
var numDataPoints = 1
var lambda = 0.2
var stepSize = 0.01
}

​>update it in the driver before starting the iteration:

iterationVariables.numDataPoints = numDP
iterationVariables.lambda = l
iterationVariable.stepSize = s


>and then use it inside the iteration - accessing it accordingly​:


val resultingWeights = weightVector.iterate(numIterations) {

weights => {

val computeGradient = new RichMapFunction[LabeledPoint, DenseVector[Double]] {
var originalW: DenseVector[Double] = _

override def open(parameters: Configuration): Unit = {
originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
}

override def map(dp: LabeledPoint): DenseVector[Double] = {
val learning_rate: Double = iterationVariables.s / Math.sqrt(getIterationRuntimeContext().getSuperstepNumber.toDouble)

val sumElement = (dp.features.toDenseVector * (dp.label - mlutils.logisticFunction(originalW, (dp.features)))
- (iterationVariables.lambda / iterationVariables.numDataPoints.toDouble) * originalW
) * learning_rate

sumElement
}
}
val newWeights : DataSet[DenseVector[Double]] = weights.union(data.map(computeGradient).withBroadcastSet(weights, WEIGHT_VECTOR).reduce{_ + _}).reduce{_ + _}
newWeights
}


This did work perfectly fine in local mode, however once deployed on an actual cluster, iterationVariables inside the iteration actually returns the values set in the original object (e.g. numDataPoints = 1) and not the updated value that was set later in the driver, ultimately leading to wrong results in the computation. 

So once again, is there a way to get parameters the will only be known at run-time inside an iteration?

Best regards,
Christoph Boden 



Reply | Threaded
Open this post in threaded view
|

Re: Parameters inside an iteration?

Vasiliki Kalavri
Hi Christoph,

if I understand what you want to do correctly, making your RichMapFunction a standalone class and passing your object to the constructor should work.

Cheers,
-Vasia.

On 5 July 2016 at 18:16, Boden, Christoph <[hidden email]> wrote:

Dear Flink Community,


is there a compact and efficient way to get parameters that are know at run-time, but not compile-time inside an iteration? I tried the following:


>define an object with the parameters:


object  iterationVariables{
var numDataPoints = 1
var lambda = 0.2
var stepSize = 0.01
}

​>update it in the driver before starting the iteration:

iterationVariables.numDataPoints = numDP
iterationVariables.lambda = l
iterationVariable.stepSize = s


>and then use it inside the iteration - accessing it accordingly​:


val resultingWeights = weightVector.iterate(numIterations) {

weights => {

val computeGradient = new RichMapFunction[LabeledPoint, DenseVector[Double]] {
var originalW: DenseVector[Double] = _

override def open(parameters: Configuration): Unit = {
originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
}

override def map(dp: LabeledPoint): DenseVector[Double] = {
val learning_rate: Double = iterationVariables.s / Math.sqrt(getIterationRuntimeContext().getSuperstepNumber.toDouble)

val sumElement = (dp.features.toDenseVector * (dp.label - mlutils.logisticFunction(originalW, (dp.features)))
- (iterationVariables.lambda / iterationVariables.numDataPoints.toDouble) * originalW
) * learning_rate

sumElement
}
}
val newWeights : DataSet[DenseVector[Double]] = weights.union(data.map(computeGradient).withBroadcastSet(weights, WEIGHT_VECTOR).reduce{_ + _}).reduce{_ + _}
newWeights
}


This did work perfectly fine in local mode, however once deployed on an actual cluster, iterationVariables inside the iteration actually returns the values set in the original object (e.g. numDataPoints = 1) and not the updated value that was set later in the driver, ultimately leading to wrong results in the computation. 

So once again, is there a way to get parameters the will only be known at run-time inside an iteration?

Best regards,
Christoph Boden