CombinableGroupReducer says The Iterable can be iterated over only once

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

CombinableGroupReducer says The Iterable can be iterated over only once

Alejandro Alcalde
Hello all,


But I keep getting the following exception:

The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed.

This is what I have:

class MyCombinableGroupReducer
    extends GroupReduceFunction[(Double, Double), Double]
    with GroupCombineFunction[(Double, Double), (Double, Double)] {
    import collection.JavaConverters._
    override def reduce(
      in: java.lang.Iterable[(Double, Double)],
      out: Collector[Double]): Unit =
      {
        val r = in.asScala.reduce ( (a, b) =>  ///ERROR HAPPENS HERE
          (a._1, a._2 + b._2)
        )
        out.collect(r._1 + r._2)
      }

    override def combine(
      in: lang.Iterable[(Double, Double)],
      out: Collector[(Double, Double)]): Unit = {
      ???
    }
  }

Where am I transversing `in` a second time? may be is the call to `asScala`?

Bests

-- Alejandro Alcalde - elbauldelprogramador.com
Reply | Threaded
Open this post in threaded view
|

Re: CombinableGroupReducer says The Iterable can be iterated over only once

杨力
A java.util.Iterable is expected to provide iterators again and again.

On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde <[hidden email]> wrote:
Hello all,


But I keep getting the following exception:

The Iterable can be iterated over only once. Only the first call to 'iterator()' will succeed.

This is what I have:

class MyCombinableGroupReducer
    extends GroupReduceFunction[(Double, Double), Double]
    with GroupCombineFunction[(Double, Double), (Double, Double)] {
    import collection.JavaConverters._
    override def reduce(
      in: java.lang.Iterable[(Double, Double)],
      out: Collector[Double]): Unit =
      {
        val r = in.asScala.reduce ( (a, b) =>  ///ERROR HAPPENS HERE
          (a._1, a._2 + b._2)
        )
        out.collect(r._1 + r._2)
      }

    override def combine(
      in: lang.Iterable[(Double, Double)],
      out: Collector[(Double, Double)]): Unit = {
      ???
    }
  }

Where am I transversing `in` a second time? may be is the call to `asScala`?

Bests

-- Alejandro Alcalde - elbauldelprogramador.com
Reply | Threaded
Open this post in threaded view
|

Re: CombinableGroupReducer says The Iterable can be iterated over only once

Alejandro Alcalde
Then how I am suppose to implement that function?

On 09/14/2018 05:29 PM, 杨力 wrote:

> A java.util.Iterable is expected to provide iterators again and again.
>
> On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde <[hidden email]>
> wrote:
>
>> Hello all,
>>
>> I am trying to replicate the code in the Docs (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>> )
>>
>> But I keep getting the following exception:
>>
>> The Iterable can be iterated over only once. Only the first call to
>> 'iterator()' will succeed.
>>
>> This is what I have:
>>
>> class MyCombinableGroupReducer
>>     extends GroupReduceFunction[(Double, Double), Double]
>>     with GroupCombineFunction[(Double, Double), (Double, Double)] {
>>     import collection.JavaConverters._
>>     override def reduce(
>>       in: java.lang.Iterable[(Double, Double)],
>>       out: Collector[Double]): Unit =
>>       {
>>         val r = in.asScala.reduce ( (a, b) =>  ///ERROR HAPPENS HERE
>>           (a._1, a._2 + b._2)
>>         )
>>         out.collect(r._1 + r._2)
>>       }
>>
>>     override def combine(
>>       in: lang.Iterable[(Double, Double)],
>>       out: Collector[(Double, Double)]): Unit = {
>>       ???
>>     }
>>   }
>>
>> Where am I transversing `in` a second time? may be is the call to
>> `asScala`?
>>
>> Bests
>>
>> *-- Alejandro Alcalde - elbauldelprogramador.com
>> <http://elbauldelprogramador.com>*
>>
>
--
elbauldelprogramador.com

0xAD8D7F23318B63C0.asc (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: CombinableGroupReducer says The Iterable can be iterated over only once

Fabian Hueske-2
Hi Alejandro,

asScala calls iterator() the first time and reduce() another time.
These iterators can only be iterated once because they are possibly backed by multiple sorted files which have been spilled to disk and are merge-sorted while iterating.

I'm actually surprised that you found this code in the documentation. Maybe this worked for Scala 2.10...
Anyway, there's a few ways to work around this.

1) use a ReduceFunction (after all you are calling reduce() in the GroupReduceFunction.
2) define the group reduce function as function with native Scala iterators:

val myReduceFunc = (in: Iterator[(Double, Double)], out: Collector[Double]) => {
  val r = in.reduce ( (a, b) =>
    (a._1, a._2 + b._2)
  )
  out.collect(r._1 + r._2)
}

I've created a Jira issue to report the broken docs. [1]

Best,
Fabian


2018-09-14 18:52 GMT+02:00 Alejandro <[hidden email]>:
Then how I am suppose to implement that function?

On 09/14/2018 05:29 PM, 杨力 wrote:
> A java.util.Iterable is expected to provide iterators again and again.
>
> On Fri, Sep 14, 2018 at 10:53 PM Alejandro Alcalde <[hidden email]>
> wrote:
>
>> Hello all,
>>
>> I am trying to replicate the code in the Docs (
>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>> )
>>
>> But I keep getting the following exception:
>>
>> The Iterable can be iterated over only once. Only the first call to
>> 'iterator()' will succeed.
>>
>> This is what I have:
>>
>> class MyCombinableGroupReducer
>>     extends GroupReduceFunction[(Double, Double), Double]
>>     with GroupCombineFunction[(Double, Double), (Double, Double)] {
>>     import collection.JavaConverters._
>>     override def reduce(
>>       in: java.lang.Iterable[(Double, Double)],
>>       out: Collector[Double]): Unit =
>>       {
>>         val r = in.asScala.reduce ( (a, b) =>  ///ERROR HAPPENS HERE
>>           (a._1, a._2 + b._2)
>>         )
>>         out.collect(r._1 + r._2)
>>       }
>>
>>     override def combine(
>>       in: lang.Iterable[(Double, Double)],
>>       out: Collector[(Double, Double)]): Unit = {
>>       ???
>>     }
>>   }
>>
>> Where am I transversing `in` a second time? may be is the call to
>> `asScala`?
>>
>> Bests
>>
>> *-- Alejandro Alcalde - elbauldelprogramador.com
>> <http://elbauldelprogramador.com>*
>>
>

--
elbauldelprogramador.com