All:
I'm trying to use SparseVectors with FlinkML 0.10.1. It does not seem to be working. Here is a UnitTest that I created to recreate the problem: package com.aol.ds.arc.ml.poc.flink It fails with this error: If SparseVectors are not supported, when can we expect them to be supported for MLR? |
Hi Gna,
Thanks for reporting the problem. Because level 1 operation in FlinkML BLAS library doesn’t support SparseVector, SparseVector is not supported currently. I’ve filed this to JIRA [1]. Maybe I can send a patch to solve this in few days. [1]: https://issues.apache.org/jira/browse/FLINK-3330 Regards, Chiwan Park > On Feb 4, 2016, at 5:39 AM, Sourigna Phetsarath <[hidden email]> wrote: > > All: > > I'm trying to use SparseVectors with FlinkML 0.10.1. It does not seem to be working. Here is a UnitTest that I created to recreate the problem: > > > package com.aol.ds.arc.ml.poc.flink > > import org.junit.After > import org.junit.Before > import org.slf4j.LoggerFactory > import org.apache.flink.test.util.ForkableFlinkMiniCluster > import scala.concurrent.duration.FiniteDuration > import java.util.concurrent.TimeUnit > import org.apache.flink.test.util.TestBaseUtils > import org.apache.flink.runtime.StreamingMode > import org.apache.flink.test.util.TestEnvironment > import org.junit.Test > import org.apache.flink.ml.common.LabeledVector > import org.apache.flink.ml.math.SparseVector > import org.apache.flink.api.scala._ > import org.apache.flink.ml.regression.MultipleLinearRegression > import org.apache.flink.ml.math.DenseVector > class FlinkMLRTest { > var Logger = LoggerFactory.getLogger(getClass.getName) > var cluster: Option[ForkableFlinkMiniCluster] = None > val parallelism = 4 > val DEFAULT_AKKA_ASK_TIMEOUT = 1000 > val DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS) > @Before > def doBefore(): Unit = { > val cl = TestBaseUtils.startCluster( > 1, > parallelism, > StreamingMode.BATCH_ONLY, > false, > false, > true) > val clusterEnvironment = new TestEnvironment(cl, parallelism) > clusterEnvironment.setAsContext() > cluster = Some(cl) > } > @After > def doAfter(): Unit = { > cluster.map(c => TestBaseUtils.stopCluster(c, DEFAULT_TIMEOUT)) > } > @Test > def testMLR() { > val env = ExecutionEnvironment.getExecutionEnvironment > val training = Seq( > new LabeledVector(1.0, new SparseVector(10, Array(0, 2, 3), Array(1.0, 1.0, 1.0))), > new LabeledVector(1.0, new SparseVector(10, Array(0, 1, 5, 9), Array(1.0, 1.0, 1.0, 1.0))), > new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))), > new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0))), > new LabeledVector(0.0, new SparseVector(10, Array(0, 2), Array(0.0, 1.0))), > new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0)))) > val testing = Seq( > new LabeledVector(1.0, new SparseVector(10, Array(0, 3), Array(1.0, 1.0))), > new LabeledVector(0.0, new SparseVector(10, Array(0, 2, 3), Array(0.0, 1.0, 1.0))), > new LabeledVector(0.0, new SparseVector(10, Array(0), Array(0.0)))) > val trainingDS = env.fromCollection(training) > val testingDS = env.fromCollection(testing) > trainingDS.print() > val mlr = MultipleLinearRegression() > .setIterations(100) > .setStepsize(2) > .setConvergenceThreshold(0.001) > mlr.fit(trainingDS) > val weights = mlr.weightsOption match { > case Some(weights) => { weights.collect() } > case None => throw new Exception("Could not calculate the weights.") > } > if (Logger.isInfoEnabled()) > Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}") > testingDS.print() > val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label))) > if (Logger.isInfoEnabled()) { > Logger.info(predictions.collect().mkString(",")) > } > } > @Test > def testMLR_DenseVector() { > val env = ExecutionEnvironment.getExecutionEnvironment > val training = Seq( > new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), > new LabeledVector(1.0, DenseVector(1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0)), > new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), > new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), > new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), > new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))) > val testing = Seq( > new LabeledVector(1.0, DenseVector(1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), > new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)), > new LabeledVector(0.0, DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))) > val trainingDS = env.fromCollection(training) > val testingDS = env.fromCollection(testing) > trainingDS.print() > val mlr = MultipleLinearRegression() > .setIterations(100) > .setStepsize(2) > .setConvergenceThreshold(0.001) > mlr.fit(trainingDS) > val weights = mlr.weightsOption match { > case Some(weights) => { weights.collect() } > case None => throw new Exception("Could not calculate the weights.") > } > if (Logger.isInfoEnabled()) > Logger.info(s"*** WEIGHTS: ${weights.mkString(";")}") > testingDS.print() > val predictions = mlr.evaluate(testingDS.map(x => (x.vector, x.label))) > if (Logger.isInfoEnabled()) { > Logger.info(s"**** PREDICTIONS: ${predictions.collect().mkString(",")}") > } > } > } > > It fails with this error: > > java.lang.IllegalArgumentException: axpy only supports adding to a dense vector but got type class org.apache.flink.ml.math.SparseVector. > at org.apache.flink.ml.math.BLAS$.axpy(BLAS.scala:60) > at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:181) > at org.apache.flink.ml.optimization.GradientDescent$$anonfun$org$apache$flink$ml$optimization$GradientDescent$$SGDStep$2.apply(GradientDescent.scala:177) > at org.apache.flink.api.scala.DataSet$$anon$7.reduce(DataSet.scala:583) > at org.apache.flink.runtime.operators.AllReduceDriver.run(AllReduceDriver.java:132) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) > at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:144) > at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > If SparseVectors are not supported, when can we expect them to be supported for MLR? > > Thanks in advance for any information that you can provide. > -- > Gna Phetsarath > System Architect // AOL Platforms // Data Services // Applied Research Chapter > 770 Broadway, 5th Floor, New York, NY 10003 > o: 212.402.4871 // m: 917.373.7363 > vvmr: 8890237 aim: sphetsarath20 t: @sourigna > > > |
Hi Sourigna, it turned out to be a bug in the [1] https://github.com/apache/flink/pull/1587 Cheers, On Thu, Feb 4, 2016 at 5:09 AM, Chiwan Park <[hidden email]> wrote: Hi Gna, |
Free forum by Nabble | Edit this page |