Hi,
It's stating that you can't use a DataStream which was not part of the
iteration. It works with `newCentroids` because it is part of the
loop.
The only way to get the centroids DataStream in, is to union/join it
with the `newCentroids` stream.
Cheers,
Max
On Wed, Jul 20, 2016 at 11:33 AM, subash basnet <
[hidden email]> wrote:
> Hello all,
>
> When I execute the below streaming code:
> DataStream<Centroid> centroids = newCentroidDataStream.map(new
> TupleCentroidConverter());
> ConnectedIterativeStreams<Point, Centroid> loop =
> points.iterate().withFeedbackType(Centroid.class);
> DataStream<Centroid> newCentroids = loop.flatMap(new
> SelectNearestCenter(10)).map(new CountAppender()).keyBy(0)
> .reduce(new CentroidAccumulator()).map(new CentroidAverager());
> DataStream<Centroid> finalCentroids = loop.closeWith(centroids.broadcast());
>
> the following exception arises:
> Exception in thread "main" java.lang.UnsupportedOperationException: Cannot
> close an iteration with a feedback DataStream that does not originate from
> said iteration.
> at
> org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181)
>
> If I use loop.closeWith(newCentroids.broadcast()) it works fine. I am not
> able to fully understand the error message. Could you explain it more in
> depth the error message in relation to above code.
>
> Best Regards,
> Subash Basnet