Hello all, I am trying to replicate the code in the Docs (https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions) But I keep getting the following exception: The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed. This is what I have: class MyCombinableGroupReducer extends GroupReduceFunction[(Double, Double), Double] with GroupCombineFunction[(Double, Double), (Double, Double)] { import collection.JavaConverters._ override def reduce( in: java.lang.Iterable[(Double, Double)], out: Collector[Double]): Unit = { val r = in.asScala.reduce ( (a, b) => ///ERROR HAPPENS HERE (a._1, a._2 + b._2) ) out.collect(r._1 + r._2) } override def combine( in: lang.Iterable[(Double, Double)], out: Collector[(Double, Double)]): Unit = { ??? } } Where am I transversing `in` a second time? may be is the call to `asScala`? Bests -- Alejandro Alcalde - elbauldelprogramador.com |
A java.util.Iterable is expected to provide iterators again and again. On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde <[hidden email]> wrote:
|
Then how I am suppose to implement that function?
On 09/14/2018 05:29 PM, 杨力 wrote: > A java.util.Iterable is expected to provide iterators again and again. > > On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde <[hidden email]> > wrote: > >> Hello all, >> >> I am trying to replicate the code in the Docs ( >> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions >> ) >> >> But I keep getting the following exception: >> >> The Iterable can be iterated over only once. Only the first call to >> 'iterator()' will succeed. >> >> This is what I have: >> >> class MyCombinableGroupReducer >> extends GroupReduceFunction[(Double, Double), Double] >> with GroupCombineFunction[(Double, Double), (Double, Double)] { >> import collection.JavaConverters._ >> override def reduce( >> in: java.lang.Iterable[(Double, Double)], >> out: Collector[Double]): Unit = >> { >> val r = in.asScala.reduce ( (a, b) => ///ERROR HAPPENS HERE >> (a._1, a._2 + b._2) >> ) >> out.collect(r._1 + r._2) >> } >> >> override def combine( >> in: lang.Iterable[(Double, Double)], >> out: Collector[(Double, Double)]): Unit = { >> ??? >> } >> } >> >> Where am I transversing `in` a second time? may be is the call to >> `asScala`? >> >> Bests >> >> *-- Alejandro Alcalde - elbauldelprogramador.com >> <http://elbauldelprogramador.com>* >> > elbauldelprogramador.com 0xAD8D7F23318B63C0.asc (3K) Download Attachment |
Hi Alejandro, asScala calls iterator() the first time and reduce() another time. These iterators can only be iterated once because they are possibly backed by multiple sorted files which have been spilled to disk and are merge-sorted while iterating. I'm actually surprised that you found this code in the documentation. Maybe this worked for Scala 2.10... Anyway, there's a few ways to work around this. 1) use a ReduceFunction (after all you are calling reduce() in the GroupReduceFunction. 2) define the group reduce function as function with native Scala iterators: val myReduceFunc = (in: Iterator[(Double, Double)], out: Collector[Double]) => { val r = in.reduce ( (a, b) => (a._1, a._2 + b._2) ) out.collect(r._1 + r._2) } I've created a Jira issue to report the broken docs. [1] Best, Fabian 2018-09-14 18:52 GMT+02:00 Alejandro <[hidden email]>: Then how I am suppose to implement that function? |
Free forum by Nabble | Edit this page |