Hello all,
I am getting the below error on execute of StreamExecutionEnvironment. Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges. The run method inside the thread class of DataStreamUtils handles this exception: @Override public void run(){ try { stream.getExecutionEnvironment().execute(); } catch (Exception e) { throw new RuntimeException("Exception in execute()", e); } } I am not able to understand what to infer from this error message so that I could solve it. Best Regards, Subash Basnet |
Could you please post your code. On Sat, 7 May 2016 at 19:16 subash basnet <[hidden email]> wrote:
|
In reply to this post by subashbasnet
Hello Aljoscha, Below is the shorted version of StockAnalysis class which is a datastream adapation of the KMeans.java dataset code. public class StockAnalysis{ public static void main(String args[]){ DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter()); loop = centroids.iterate(10); DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> { private Collection<Centroid> centroids; @Override public void open(Configuration parameters) throws Exception { Iterator<Centroid> iter = DataStreamUtils.collect(loop); this.centroids = Lists.newArrayList(iter); } @Override public Tuple2<String, Point> map(Point p) throws Exception { for (Centroid centroid : centroids) { }................... } } } On Sun, May 8, 2016 at 7:10 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Hello, I had to use, private static IterativeStream<Centroid> loop; loop as global variable because I cannot broadcast it like that of DataSet API in DataStream API. I tried to use closewith like that of DataSet as below in DataStream but it gives me exception: DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids); Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration. at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75) at wikiedits.StockAnalysis.main(StockAnalysis.java:64) Best Regards, Subash Basnet On Sat, May 14, 2016 at 4:26 PM, subash basnet <[hidden email]> wrote:
|
There you have your explanation. A loop actually has to be a loop for it to work in Flink. On Sat, 14 May 2016 at 16:35 subash basnet <[hidden email]> wrote:
|
In reply to this post by subashbasnet
Hello Aljoscha, For DataSet: IterativeDataSet<Centroid> loop = centroids.iterate(numIterations); DataSet<Centroid> newCentroids = points.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") .map(new CountAppender()).groupBy(0).reduce(new CentroidAccumulator()) .map(new CentroidAverager()); // feed new centroids back into next iteration DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); It's working fine, now if I do the same operation in DataStream as below: IterativeDataStream<Centroid>loop = centroids.iterate(numIterations); DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter()).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); I get the following exception as already mentioned in earlier thread: Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration. at org.apache.flink.streaming.api.datastream.IterativeStream.closeWith(IterativeStream.java:75) at wikiedits.StockAnalysisKMeansOutlierDetection.main(StockAnalysisKMeansOutlierDetection.java:98) Could you please suggest me where I am wrong here? Best Regards, Subash Basnet On Mon, May 16, 2016 at 6:11 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |