closewith(...) not working in DataStream error, but works in DataSet

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

closewith(...) not working in DataStream error, but works in DataSet

subashbasnet
Hello all,

loop.closewith(...) in below code works fine for the dataset, but the adaptation of the same code for datastream throws exception. 

For DataSet: 
IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
DataSet<Centroid> newCentroids = points.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
.map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator())
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);


It's working fine, now if I do the same operation in DataStream as below

IterativeStream<Centroid>loop = centroids.iterate(numIterations);
DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids);

I get the following exception as already mentioned in earlier emails:
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration.
at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysisKMeansOutlierDetection.main(StockAnalysisKMeansOutlierDetection.java:98)

Could you please suggest me where my understanding is wrong here? 

I couldn't infer much from the given explanation about closewith(..) with in DataSet and DataStream as:
Closes the iteration. This method defines the end of the iterative program part that will be fed back to the start of the iteration.A common usage pattern for streaming iterations is to use output splitting to send a part of the closing data stream to the head. ----------------------------------------> For DataSet

Closes the iteration. This method defines the end of the iterative program part.
Parameters:
iterationResult The data set that will be fed back to the next iteration.
Returns:
The DataSet that represents the result of the iteration, after the computation has terminated.------------------------------------>For DataStream

Best Regards,
Subash Basnet


Reply | Threaded
Open this post in threaded view
|

Re: closewith(...) not working in DataStream error, but works in DataSet

Biplob Biswas
I am also a newbie but from what i experienced during my experiments is that ...The same implementation doesnt work for the streaming context because
1) In streaming context the stream is assumed to be infinite so the process of iteration is also infinite and the part with which you close your iteration is sent to the iteration head.

Thus, your connected stream needs to be something like this

IterativeStream<Centroid,Centroid>*loop* = centroids.iterate(numIterations).withFeedbackType(Centroid.class);

as the feedback is spread to the partitions again.

also the newCentroids in "*loop*.closeWith(*newCentroids*);" can't be replaced with any other datastream as this is the feedback stream which originates from the said iteration.

Whether this solves your problem or not is something I can't say as I am struggling with the concept of centroid in iteration with broadcast for sometime myself without much help from the community.

subash basnet wrote
Hello all,

loop.closewith(...) in below code works fine for the dataset, but the
adaptation of the same code for datastream throws exception.

For *DataSet: *
IterativeDataSet<Centroid> *loop* = centroids.iterate(numIterations);
DataSet<Centroid> *newCentroids* = points.map(new SelectNearestCenter()).
*withBroadcastSet*(*loop*, "*centroids*")
.map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator())
.map(new CentroidAverager());
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*);


It's working fine, now if I do the same operation in *DataStream *as below
*: *
IterativeStream<Centroid>*loop* = centroids.iterate(numIterations);
DataStream<Centroid> *newCentroids* = points.map(new
SelectNearestCenter()).map(new CountAppender()).keyBy(0)
.reduce(new CentroidAccumulator()).map(new CentroidAverager());
DataStream<Centroid> finalCentroids = *loop*.closeWith(*newCentroids*);

I get the following exception as already mentioned in earlier emails:
Exception in thread "main" java.lang.UnsupportedOperationException: *Cannot
close an iteration with a feedback DataStream that does not originate from
said iteration.*
at
org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75)
at wikiedits.StockAnalysisKMeansOutlierDetection.main(
StockAnalysisKMeansOutlierDetection.java:98)

Could you please suggest me where my understanding is wrong here?

I couldn't infer much from the given explanation about *closewith(..)* with
in DataSet and DataStream as:
Closes the iteration. This method defines the end of the iterative program
part that will be fed back to the start of the iteration.A common usage
pattern for streaming iterations is to use output splitting to send a part
of the closing data stream to the
head. ----------------------------------------> For DataSet

Closes the iteration. This method defines the end of the iterative program
part.
Parameters:
iterationResult The data set that will be fed back to the next iteration.
Returns:
The DataSet that represents the result of the iteration, after the
computation has terminated.------------------------------------>For
DataStream

Best Regards,
Subash Basnet