FlinkML 0.10.1 - Using SparseVectors with MLR does not work

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkML 0.10.1 - Using SparseVectors with MLR does not work

Sourigna Phetsarath
All:

I'm trying to use SparseVectors with FlinkML 0.10.1.  It does not seem to be working.  Here is a UnitTest that I created to recreate the problem:


package com.aol.ds.arc.ml.poc.flink 

import org.junit.After
import org.junit.Before
import org.slf4j.LoggerFactory
import org.apache.flink.test.util.ForkableFlinkMiniCluster
import scala.concurrent.duration.FiniteDuration
import java.util.concurrent.TimeUnit
import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.runtime.StreamingMode
import org.apache.flink.test.util.TestEnvironment
import org.junit.Test
import org.apache.flink.ml.common.LabeledVector
import org.apache.flink.ml.math.SparseVector
import org.apache.flink.api.scala._
import org.apache.flink.ml.regression.MultipleLinearRegression
import org.apache.flink.ml.math.DenseVector
class FlinkMLRTest {
  var Logger = LoggerFactory.getLogger(getClass.getName)
  var cluster: Option[ForkableFlinkMiniCluster] = None
  val parallelism = 4
  val DEFAULT_AKKA_ASK_TIMEOUT = 1000
  val DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS)
  @Before
  def doBefore(): Unit = {
    val cl = TestBaseUtils.startCluster(
      1,
      parallelism,
      StreamingMode.BATCH_ONLY,
      false,
      false,
      true)
    val clusterEnvironment = new TestEnvironment(cl, parallelism)
    clusterEnvironment.setAsContext()
    cluster = Some(cl)
  }
  @After
  def doAfter(): Unit = {
    cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
  }
  @Test
  def testMLR() {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val training = Seq(
      new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 1.0, 1.0))),
      new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), Array(1.0, 1.0, 1.0, 1.0))),
      new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
      new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
      new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
      new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
    val testing = Seq(
      new LabeledVector(1.0, new SparseVector(10, Array(0, 3), Array(1.0, 1.0))),
      new LabeledVector(0.0, new SparseVector(10, Array(0, 2, 3), Array(0.0, 1.0, 1.0))),
      new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
    val trainingDS = env.fromCollection(training)
    val testingDS = env.fromCollection(testing)
    trainingDS.print()
    val mlr = MultipleLinearRegression()
      .setIterations(100)
      .setStepsize(2)
      .setConvergenceThreshold(0.001)
    mlr.fit(trainingDS)
    val weights = mlr.weightsOption match {
      case Some(weights) => { weights.collect() }
      case None => throw new Exception("Could not calculate the weights.")
    }
    if (Logger.isInfoEnabled())
      Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
    testingDS.print()
    val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
    if (Logger.isInfoEnabled()) {
      Logger.info(predictions.collect().mkString(","))
    }
  }
  @Test
  def testMLR_DenseVector() {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val training = Seq(
      new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
      new LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0)),
      new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
      new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
      new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
      new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)))
    val testing = Seq(
      new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
      new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
      new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)))
    val trainingDS = env.fromCollection(training)
    val testingDS = env.fromCollection(testing)
    trainingDS.print()
    val mlr = MultipleLinearRegression()
      .setIterations(100)
      .setStepsize(2)
      .setConvergenceThreshold(0.001)
    mlr.fit(trainingDS)
    val weights = mlr.weightsOption match {
      case Some(weights) => { weights.collect() }
      case None => throw new Exception("Could not calculate the weights.")
    }
    if (Logger.isInfoEnabled())
      Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
    testingDS.print()
    val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
    if (Logger.isInfoEnabled()) {
      Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString(",")}")
    }
  }
}

It fails with this error:

java.lang.IllegalArgumentException: axpy only supports adding to a dense vector but got type class org.apache.flink.ml.math.SparseVector.
at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60)
at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:181)
at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:177)
at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583)
at org.apache.flink.runtime.operators.AllReduceDriver.run(AllReduceDriver.java:132)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:144)
at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

If SparseVectors are not supported, when can we expect them to be supported for MLR?

Thanks in advance for any information that you can provide.
--

Gna Phetsarath
System Architect // AOL Platforms // Data Services // Applied Research Chapter
770 Broadway, 5th Floor, New York, NY 10003
o: 212.402.4871 // m: 917.373.7363
vvmr: 8890237 
aim: sphetsarath20 t: @sourigna


Reply | Threaded
Open this post in threaded view
|

Re: FlinkML 0.10.1 - Using SparseVectors with MLR does not work

Chiwan Park-2
Hi Gna,

Thanks for reporting the problem. Because level 1 operation in FlinkML BLAS library doesn’t support SparseVector, SparseVector is not supported currently. I’ve filed this to JIRA [1].

Maybe I can send a patch to solve this in few days.

[1]: https://issues.apache.org/jira/browse/FLINK-3330

Regards,
Chiwan Park

> On Feb 4, 2016, at 5:39 AM, Sourigna Phetsarath <[hidden email]> wrote:
>
> All:
>
> I'm trying to use SparseVectors with FlinkML 0.10.1.  It does not seem to be working.  Here is a UnitTest that I created to recreate the problem:
>
>
> package com.aol.ds.arc.ml.poc.flink
>
> import org.junit.After
> import org.junit.Before
> import org.slf4j.LoggerFactory
> import org.apache.flink.test.util.ForkableFlinkMiniCluster
> import scala.concurrent.duration.FiniteDuration
> import java.util.concurrent.TimeUnit
> import org.apache.flink.test.util.TestBaseUtils
> import org.apache.flink.runtime.StreamingMode
> import org.apache.flink.test.util.TestEnvironment
> import org.junit.Test
> import org.apache.flink.ml.common.LabeledVector
> import org.apache.flink.ml.math.SparseVector
> import org.apache.flink.api.scala._
> import org.apache.flink.ml.regression.MultipleLinearRegression
> import org.apache.flink.ml.math.DenseVector
> class FlinkMLRTest {
>   var Logger = LoggerFactory.getLogger(getClass.getName)
>   var cluster: Option[ForkableFlinkMiniCluster] = None
>   val parallelism = 4
>   val DEFAULT_AKKA_ASK_TIMEOUT = 1000
>   val DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS)
>   @Before
>   def doBefore(): Unit = {
>     val cl = TestBaseUtils.startCluster(
>       1,
>       parallelism,
>       StreamingMode.BATCH_ONLY,
>       false,
>       false,
>       true)
>     val clusterEnvironment = new TestEnvironment(cl, parallelism)
>     clusterEnvironment.setAsContext()
>     cluster = Some(cl)
>   }
>   @After
>   def doAfter(): Unit = {
>     cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
>   }
>   @Test
>   def testMLR() {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val training = Seq(
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 1.0, 1.0))),
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), Array(1.0, 1.0, 1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
>     val testing = Seq(
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 3), Array(1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2, 3), Array(0.0, 1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
>     val trainingDS = env.fromCollection(training)
>     val testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     val mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     val weights = mlr.weightsOption match {
>       case Some(weights) => { weights.collect() }
>       case None => throw new Exception("Could not calculate the weights.")
>     }
>     if (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
>     if (Logger.isInfoEnabled()) {
>       Logger.info(predictions.collect().mkString(","))
>     }
>   }
>   @Test
>   def testMLR_DenseVector() {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val training = Seq(
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)))
>     val testing = Seq(
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)))
>     val trainingDS = env.fromCollection(training)
>     val testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     val mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     val weights = mlr.weightsOption match {
>       case Some(weights) => { weights.collect() }
>       case None => throw new Exception("Could not calculate the weights.")
>     }
>     if (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
>     if (Logger.isInfoEnabled()) {
>       Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString(",")}")
>     }
>   }
> }
>
> It fails with this error:
>
> java.lang.IllegalArgumentException: axpy only supports adding to a dense vector but got type class org.apache.flink.ml.math.SparseVector.
> at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60)
> at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:181)
> at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:177)
> at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583)
> at org.apache.flink.runtime.operators.AllReduceDriver.run(AllReduceDriver.java:132)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:144)
> at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> If SparseVectors are not supported, when can we expect them to be supported for MLR?
>
> Thanks in advance for any information that you can provide.
> --
> Gna Phetsarath
> System Architect // AOL Platforms // Data Services // Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: FlinkML 0.10.1 - Using SparseVectors with MLR does not work

Till Rohrmann-2

Hi Sourigna,

it turned out to be a bug in the GradientDescent implementation which cannot handle sparse gradients. That is not so problematic by itself, because the sum of gradient vectors is usually dense even if the individual gradient vectors are sparse. We simply forgot to initialize the initial vector of the reduce operation to be dense. I’ve created a PR [1] which should fix the problem. After reviewing it, it should be merged in the next days.

[1] https://github.com/apache/flink/pull/1587

Cheers,
Till


On Thu, Feb 4, 2016 at 5:09 AM, Chiwan Park <[hidden email]> wrote:
Hi Gna,

Thanks for reporting the problem. Because level 1 operation in FlinkML BLAS library doesn’t support SparseVector, SparseVector is not supported currently. I’ve filed this to JIRA [1].

Maybe I can send a patch to solve this in few days.

[1]: https://issues.apache.org/jira/browse/FLINK-3330

Regards,
Chiwan Park

> On Feb 4, 2016, at 5:39 AM, Sourigna Phetsarath <[hidden email]> wrote:
>
> All:
>
> I'm trying to use SparseVectors with FlinkML 0.10.1.  It does not seem to be working.  Here is a UnitTest that I created to recreate the problem:
>
>
> package com.aol.ds.arc.ml.poc.flink
>
> import org.junit.After
> import org.junit.Before
> import org.slf4j.LoggerFactory
> import org.apache.flink.test.util.ForkableFlinkMiniCluster
> import scala.concurrent.duration.FiniteDuration
> import java.util.concurrent.TimeUnit
> import org.apache.flink.test.util.TestBaseUtils
> import org.apache.flink.runtime.StreamingMode
> import org.apache.flink.test.util.TestEnvironment
> import org.junit.Test
> import org.apache.flink.ml.common.LabeledVector
> import org.apache.flink.ml.math.SparseVector
> import org.apache.flink.api.scala._
> import org.apache.flink.ml.regression.MultipleLinearRegression
> import org.apache.flink.ml.math.DenseVector
> class FlinkMLRTest {
>   var Logger = LoggerFactory.getLogger(getClass.getName)
>   var cluster: Option[ForkableFlinkMiniCluster] = None
>   val parallelism = 4
>   val DEFAULT_AKKA_ASK_TIMEOUT = 1000
>   val DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS)
>   @Before
>   def doBefore(): Unit = {
>     val cl = TestBaseUtils.startCluster(
>       1,
>       parallelism,
>       StreamingMode.BATCH_ONLY,
>       false,
>       false,
>       true)
>     val clusterEnvironment = new TestEnvironment(cl, parallelism)
>     clusterEnvironment.setAsContext()
>     cluster = Some(cl)
>   }
>   @After
>   def doAfter(): Unit = {
>     cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT))
>   }
>   @Test
>   def testMLR() {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val training = Seq(
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 1.0, 1.0))),
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), Array(1.0, 1.0, 1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
>     val testing = Seq(
>       new LabeledVector(1.0, new SparseVector(10, Array(0, 3), Array(1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0, 2, 3), Array(0.0, 1.0, 1.0))),
>       new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))))
>     val trainingDS = env.fromCollection(training)
>     val testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     val mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     val weights = mlr.weightsOption match {
>       case Some(weights) => { weights.collect() }
>       case None => throw new Exception("Could not calculate the weights.")
>     }
>     if (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
>     if (Logger.isInfoEnabled()) {
>       Logger.info(predictions.collect().mkString(","))
>     }
>   }
>   @Test
>   def testMLR_DenseVector() {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val training = Seq(
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)))
>     val testing = Seq(
>       new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)),
>       new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)))
>     val trainingDS = env.fromCollection(training)
>     val testingDS = env.fromCollection(testing)
>     trainingDS.print()
>     val mlr = MultipleLinearRegression()
>       .setIterations(100)
>       .setStepsize(2)
>       .setConvergenceThreshold(0.001)
>     mlr.fit(trainingDS)
>     val weights = mlr.weightsOption match {
>       case Some(weights) => { weights.collect() }
>       case None => throw new Exception("Could not calculate the weights.")
>     }
>     if (Logger.isInfoEnabled())
>       Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}")
>     testingDS.print()
>     val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label)))
>     if (Logger.isInfoEnabled()) {
>       Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString(",")}")
>     }
>   }
> }
>
> It fails with this error:
>
> java.lang.IllegalArgumentException: axpy only supports adding to a dense vector but got type class org.apache.flink.ml.math.SparseVector.
> at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60)
> at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:181)
> at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:177)
> at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583)
> at org.apache.flink.runtime.operators.AllReduceDriver.run(AllReduceDriver.java:132)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:144)
> at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> If SparseVectors are not supported, when can we expect them to be supported for MLR?
>
> Thanks in advance for any information that you can provide.
> --
> Gna Phetsarath
> System Architect // AOL Platforms // Data Services // Applied Research Chapter
> 770 Broadway, 5th Floor, New York, NY 10003
> o: 212.402.4871 // m: 917.373.7363
> vvmr: 8890237 aim: sphetsarath20 t: @sourigna
>
>
>