Hello all,
loop.closewith(...) in below code works fine for the dataset, but the adaptation of the same code for datastream throws exception. For DataSet: IterativeDataSet<Centroid> loop = centroids.iterate(numIterations); DataSet<Centroid> newCentroids = points.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") .map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator()) .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); It's working fine, now if I do the same operation in DataStream as below: IterativeStream<Centroid>loop = centroids.iterate(numIterations); DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids); I get the following exception as already mentioned in earlier emails: Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration. at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75) at wikiedits.StockAnalysisKMeansOutlierDetection.main(StockAnalysisKMeansOutlierDetection.java:98) Could you please suggest me where my understanding is wrong here? I couldn't infer much from the given explanation about closewith(..) with in DataSet and DataStream as: Closes the iteration. This method defines the end of the iterative program part that will be fed back to the start of the iteration.A common usage pattern for streaming iterations is to use output splitting to send a part of the closing data stream to the head. ----------------------------------------> For DataSet Closes the iteration. This method defines the end of the iterative program part. Parameters: iterationResult The data set that will be fed back to the next iteration. Returns: The DataSet that represents the result of the iteration, after the computation has terminated.------------------------------------>For DataStream Best Regards, Subash Basnet |
I am also a newbie but from what i experienced during my experiments is that ...The same implementation doesnt work for the streaming context because
1) In streaming context the stream is assumed to be infinite so the process of iteration is also infinite and the part with which you close your iteration is sent to the iteration head. Thus, your connected stream needs to be something like this IterativeStream<Centroid,Centroid>*loop* = centroids.iterate(numIterations).withFeedbackType(Centroid.class); as the feedback is spread to the partitions again. also the newCentroids in "*loop*.closeWith(*newCentroids*);" can't be replaced with any other datastream as this is the feedback stream which originates from the said iteration. Whether this solves your problem or not is something I can't say as I am struggling with the concept of centroid in iteration with broadcast for sometime myself without much help from the community.
|
Free forum by Nabble | Edit this page |