Hi Flinker, I try to implement a quadratic distribution i.e. I would like to choose an element from a dataset with probability proportional to it's squared value. In Python this would look like this: s = numpy.cumsum(residual**2) x = numpy.random.rand() * s[-1] return residual[numpy.sum(x > s)] With Flink it is somewhat more complicated, I gave it a try: import util.Random val X = DataSource(XFile, CsvInputFormat[Float]) val Y = DataSource(YFile, CsvInputFormat[Float]) // take square of them val X_2 = X map { x => (x*x, x) } // calc sum of squares val X_sum = X_2 reduce { (x1, x2) => (x1._1 + x2._1, 0) } map { x => x._1 } // choose random value in our range val y = X_sum map { Random.nextFloat * _ } // make cummulative sum and find value we search for val center = X_2 map { x => (0.0f, x._1, x._2) //sum, x^2, x } reduce { (x1, x2) => if(x1._1 > y){// already found value we searched for x1 } else { if(x1._1 + x2._2 > y){// this is the value we search for (x1._1 + x2._2, x2._2, x2._3) } else { (x1._1 + x2._2, x1._2, x2._3) // just go on with cummulative sum } } } map { _._3 } // we just need the initial value val output = center //map { x => println(x); x } val sink = output.write("/tmp/test", CsvOutputFormat[Float], "Center output") My problem here is now, I need to get the information stored in y into the reduce statement to gather the center value. Unfortunately I have no idea how to achieve that. If somebody knows a way I would be rather thankful. If someone would know a easier way to solve this problem too!
Many thanks in advance! Cheers Max
|
Hi, for the Java API there are the so-called broadcast variables. Those can be used to set the output of an operation as an additional input of another operator. The feature is not available in the Scala API though? Or am I wrong here?I'm right now working on bringing the Scala API to feature parity with the Java API. Aljoscha On Wed, Aug 13, 2014 at 5:51 PM, Maximilian Alber <[hidden email]> wrote:
|
Thanks for the quick reply. Ok, but is there a way to get the only element out of a DataSet into a variable? Mit freundlichen Grüßen,
Max! On Thu, Aug 14, 2014 at 11:13 AM, Aljoscha Krettek <[hidden email]> wrote:
|
No, unfortunately that's not possible right now because a DataSet only represents an Execution that is run when the program is executed. So while building your program by chaining together operations the actual data is not yet available. I hope that helps but the whole thing can be a bit confusing. So just ask if you need clarification.Aljoscha On Thu, Aug 14, 2014 at 3:01 PM, Maximilian Alber <[hidden email]> wrote:
|
Yeah, I got that. What I had in mind was something like a variable that can be used as broadcast var, thus at runtime gets supplied by Flink to the function f.e. a map function.
It would be something like a shortcut. Right now I already could use a broadcast variable, and extract inside the open function the only value it is holding and then supplying it to the apply function. Am I right with that?
Mit freundlichen Grüßen,
Max! On Thu, Aug 14, 2014 at 5:24 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Yes, you are right. But to my knowledge Broadcast Variables are not yet supported in the Scala API. We are working on this though but it is not ready yet.
On Thu, Aug 14, 2014 at 5:41 PM, Maximilian Alber <[hidden email]> wrote:
|
Ok, thank you! Mit freundlichen Grüßen,
Max! On Thu, Aug 14, 2014 at 6:00 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Support to get Elements from a DataSet back is in progress. There is a pull request with a temporary solution: https://github.com/apache/incubator-flink/pull/94
On Thu, Aug 14, 2014 at 9:05 AM, Maximilian Alber <[hidden email]> wrote:
|
For now is there any way / workaround to use broadcast vars in Scala (I tried to use the Java classes, but it did not work out nicely, see below)? val center = X_2 map { x => (0.0f, x, -1) } reduce(new JReduceFunction{ var y: Float = 0.0f override def open(parameters: Configuration) = { val ySet = getRuntimeContext().getBroadcastVariable("Y") y = ySet.iterator().next() } override def apply(x1: (Float, Vector, Int), x2: (Float, Vector, Int)): (Float, Vector, Int) = { if(x1._3 != -1) x1 else{ if(x1._1+x2._1 > y) (x1._1+x2._1, x2._2, x2._2.id) else (x1._1+x2._1, x2._2, -1) } } }).withBroadcastSet(y, "Y") map { x => x._1 } The problem, is that the reduce function returns a scala class, and that has no member withBroadcastSet. Thank you! Mit freundlichen Grüßen,
Max! On Thu, Aug 14, 2014 at 7:45 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi Maximilian!
As far as I know, Aljoscha currently syncing the Scala API with the Java API, so it should be available soon. I am afraid that at this time, you have to stay with the Java API. You can later beautify the code by switching to the Scala API - the feature set and syntax should be so comparable then that the switch is minimal (mostly replacing the verbose anonymous classes with concise function literals). Stephan On Mon, Aug 18, 2014 at 6:51 PM, Maximilian Alber <[hidden email]> wrote:
|
Ok, thank you! Don't let you stress by me! Cheers Max On Wed, Aug 20, 2014 at 8:56 PM, Stephan Ewen <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |