Understanding iteration error message

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Understanding iteration error message

subashbasnet
Hello all,

When I execute the below streaming code:
DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());
ConnectedIterativeStreams<Point, Centroid> loop =                            points.iterate().withFeedbackType(Centroid.class);
DataStream<Centroid> newCentroids = loop.flatMap(new SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids = loop.closeWith(centroids.broadcast());

the following exception arises:
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration.
at org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181)

If I use loop.closeWith(newCentroids.broadcast()) it works fine. I am not able to fully understand the error message. Could you explain it more in depth the error message in relation to above code. 

Best Regards,
Subash Basnet
Reply | Threaded
Open this post in threaded view
|

Re: Understanding iteration error message

Maximilian Michels
Hi,

It's stating that you can't use a DataStream which was not part of the
iteration. It works with `newCentroids` because it is part of the
loop.

The only way to get the centroids DataStream in, is to union/join it
with the `newCentroids` stream.

Cheers,
Max

On Wed, Jul 20, 2016 at 11:33 AM, subash basnet <[hidden email]> wrote:

> Hello all,
>
> When I execute the below streaming code:
> DataStream<Centroid> centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
> ConnectedIterativeStreams<Point, Centroid> loop =
> points.iterate().withFeedbackType(Centroid.class);
> DataStream<Centroid> newCentroids = loop.flatMap(new
> SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
> DataStream<Centroid> finalCentroids = loop.closeWith(centroids.broadcast());
>
> the following exception arises:
> Exception in thread "main" java.lang.UnsupportedOperationException: Cannot
> close an iteration with a feedback DataStream that does not originate from
> said iteration.
> at
> org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181)
>
> If I use loop.closeWith(newCentroids.broadcast()) it works fine. I am not
> able to fully understand the error message. Could you explain it more in
> depth the error message in relation to above code.
>
> Best Regards,
> Subash Basnet