Hello all,
I am trying to cluster datastream points around a centroid. My input is stock data where the centroid id I have taken as the timestamp of the stock. The error I am facing is in getting id of the centroid within flatMap2. Below is my code if you could look: ConnectedIterativeStreams<Point, Centroid> loop = points.iterate().withFeedbackType(Centroid.class); DataStream<Centroid> newCentroids = loop.flatMap(new SelectNearestCenter(10)).map(new CountAppender()).keyBy(0) .reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids.broadcast()); public static final class SelectNearestCenter implements CoFlatMapFunction<Point, Centroid, Tuple2<String, Point>> { private Centroid[] centroids; private int size = 0; private int count = 0; private boolean flag = true; public SelectNearestCenter(int size) { this.size = size; } @Override public void flatMap1(Point p, Collector<Tuple2<String, Point>> out) throws Exception { double minDistance = Double.MAX_VALUE; String closestCentroidId = "-1"; if (centroids != null) { // let's assume minimum size 20 for now for (Centroid centroid : centroids) { // compute distance double distance = p.euclideanDistance(centroid); // update nearest cluster if necessary if (distance < minDistance) { minDistance = distance; closestCentroidId = centroid.id; } } } // emit a new record with the center id and the data point. out.collect(new Tuple2<String, Point>(closestCentroidId, p)); } @Override public void flatMap2(Centroid value, Collector<Tuple2<String, Point>> out) throws Exception { if (flag) { centroids = new Centroid[size]; flag = false; } if (count < size) { System.out.println(value); centroids[count] = value; count++; } } } The centroid datastreams looks as below with string timestamp as id. Fri Jul 15 15:30:55 CEST 2016 117.8818 117.9 117.8 117.835 1383700.0 Fri Jul 15 15:31:58 CEST 2016 117.835 117.99 117.82 117.885 118900.0 But now if I print the centroid value in flatMap2 it shows with the id as '-1': -1 117.8818 117.9 117.8 117.835 1383700.0 -1 117.5309 117.575 117.48245 117.52 707100.0 This '-1' is from flatMap1 which get's assigned initially. To get rid of this if I put the out.collect statement within the if centroids is not null condition, it never goes inside the if condition as intially the centroids is null, hence the execution never comes out of flatMap1. It would be great if you could suggest what could be the probable problem or solution to the case. Best Regards, Subash Basnet |
Hi,
you have to ensure to filter the data that you send back on the feedback edge, i.e. the loop.closeWith(newCentroids.broadcast()); statement needs to take a stream that only has the centroids that you want to send back. And you need to make sure to emit centroids with a good timestamp if you want to preserve timestamps. What you can also do is to union the stream of initial centroids with the new centroids on the feedback edge, i.e: loop.closeWith(newCentroids.union(initialCentroids).broadcast()) Cheers, Aljoscha On Mon, 18 Jul 2016 at 12:59 subash basnet <[hidden email]> wrote:
|
In reply to this post by subashbasnet
Hello Aljoscha Krettek, Thank you. As you suggested, I changed my code as below: snippet 1: DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter()); ConnectedIterativeStreams<Point, Centroid> loop = points.iterate().withFeedbackType(Centroid.class); DataStream<Centroid> newCentroids = loop.flatMap(new SelectNearestCenter(10)).map(new CountAppender()) .keyBy(0).reduce(new CentroidAccumulator()).map(new CentroidAverager()); DataStream<Centroid> finalCentroids = loop.closeWith(newCentroids.union(centroids).broadcast()); snippet 2: ConnectedIterativeStreams<Point, Centroid> loop1 = points.iterate() .withFeedbackType(Centroid.class); DataStream<ClusteredPoint> clusteredPoints = loop1.flatMap(new SelectNearestCenterForPoints(10)); loop1.closeWith(finalCentroids.broadcast()); clusteredPoints.print(); public class SelectNearestCenterForPoints implements CoFlatMapFunction<Point, Centroid, ClusteredPoint>{ .... out.collect(new ClusteredPoint(closestCentroidId, p)); ... } My problem is I get the UnsupportedOperationException exception on execution the snippet 2 as below: Exception in thread "main" java.lang.UnsupportedOperationException: Cannot close an iteration with a feedback DataStream that does not originate from said iteration. at org.apache.flink.streaming.api.datastream.IterativeStream$ConnectedIterativeStreams.closeWith(IterativeStream.java:181) In snippet 1, the final datastream required was centroid data type. But in snippet 2, I need to get another datatype of ClusteredPoint and pass finalCentroids and points to collect the clusteredPoints. What could be the solution to achieve this. Regards, Subash Basnet On Tue, Jul 19, 2016 at 12:20 PM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |