Hello all,
My task to cluster the stream of points around the centroids, I am using DataStreamUtils to collect the stream and pass it on to the map function to perform the necessary action. Below is the code: DataStream<Point> points = newDataStream.map(new getPoints()); DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter()); Iterator<Centroid> iter = DataStreamUtils.collect(centroids); Collection<Centroid> collectionCentroids = Lists.newArrayList(iter); DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter(collectionCentroids)) .map(new CountAppender()).keyBy(0).reduce(new CentroidAccumulator()).map(new CentroidAverager()); Iterator<Centroid> iter1 = DataStreamUtils.collect(newCentroids); Collection<Centroid> finalCentroidsCollection = Lists.newArrayList(iter1); DataStream<Tuple2<String, Point>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(finalCentroidsCollection)); clusteredPoints.print(); public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> { private Collection<Centroid> centroids; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } public SelectNearestCenter(Collection<Centroid> centroids) { this.centroids = centroids; } @Override public Tuple2<String, Point> map(Point p) throws Exception { double minDistance = Double.MAX_VALUE; String closestCentroidId = "-1"; .................. return new Tuple2<String, Point>(closestCentroidId, p); } } Cases: 1. Waited for around 10mins, and the clusteredPoints got printed but with centroid id as '-1' for all the points. And the execution ends after a certain time, due to multiple execution since there is one already inside the datastreamutil. Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.. How to get rid of this exception. 1> (-1, 121.86 121.87 121.8149 121.8149 60600.0) 4> (-1, 121.52 121.52 121.45 121.485 28800.0) ......... 1> (Wed Jul 20 16:45:01 CEST 2016, 121.555 121.56 121.53 121.5385 69300.0) 1> (Wed Jul 20 18:19:00 CEST 2016, 121.8699 121.89 121.86 121.86 25700.0) 3> (Wed Jul 20 16:41:59 CEST 2016, 121.415 121.47 121.41 121.4658 38400.0) 1> (Wed Jul 20 18:13:59 CEST 2016, 121.86 121.87 121.8149 121.8149 60600.0) 4> (Wed Jul 20 16:43:59 CEST 2016, 121.52 121.52 121.45 121.485 28800.0) 3> (Wed Jul 20 18:16:59 CEST 2016, 121.8716 121.92 121.85 121.9141 64500.0) 4> (Wed Jul 20 18:15:00 CEST 2016, 121.92 121.92 121.88 121.88 53500.0) 4> (Wed Jul 20 18:12:04 CEST 2016, 121.82 121.82 121.74 121.74 43600.0) .............. Why so much variation in result on executing the same code. Now, in case of centroid id as '-1' in case 1, I would not be able to perform operations later on as all the clusteredPoints have the same centroid id '-1' which should have been rather timestamp as shown in case 2. How could be the solution to this issue. Best Regards, Subash Basnet |
Free forum by Nabble | Edit this page |