Hello all,
How could I broadcast the variable in Datastream or perform similar operation so that I could read the value as in DataSet: IterativeDataSet<Centroid> loop = centroids.iterate(numIterations); DataSet<Centroid> newCentroids = points.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids") ...
INSIDE map function: @Override
public void open(...){ .
this.centroids = getRuntimeContext().getBroadcastVariable("centroids");} Is defining 'loop' as a global variable is only the option to use it in the map functions. Any other possible methods. When I use loop as global variable and read it inside map function as below via DataStreamUtils: private static IterativeStream<Centroid> loop; ... loop = centroids.iterate(numIterations); ... INSIDE map function @Override public void open(...){ Iterator<Centroid> iter = DataStreamUtils.collect(loop); this.centroids = Lists.newArrayList(iter); } It throws below exception upon execution: Exception in thread "Thread-13" java.lang.RuntimeException: Exception in execute() at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:82) Caused by: java.lang.IllegalStateException: Iteration FeedbackTransformation{id=15, name='Feedback', outputType=PojoType<wikiedits.StockAnalysis$Centroid, fields = [id: String, pt: BasicArrayTypeInfo<Double>]>, parallelism=4} does not have any feedback edges. at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformFeedback(StreamGraphGenerator.java:295) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:166) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformSink(StreamGraphGenerator.java:441) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:158) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generateInternal(StreamGraphGenerator.java:127) at org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:119) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1197) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1170) at org.apache.flink.contrib.streaming.DataStreamUtils$CallExecute.run(DataStreamUtils.java:80) Could you please suggest me possible cause and solution to this exception, as I am not able to see any other option beside to use global variable in absence of broadcast of variable in datastream. Best Regards, Subash Basnet |
Hi, something like .withBroadcastSet() is not yet available in the DataStream API. I'm working on it, however. Using a (global) static variable will not work for this case since the computation is distributed. The iteration does not work because the head of the iteration (the "loop" variable) is not used anywhere, therefore the loop is not closed. Cheers, Aljoscha On Mon, 16 May 2016 at 21:37 subash basnet <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |