DataStreamUtils conversion problem, showing varied results for same code

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

DataStreamUtils conversion problem, showing varied results for same code

subashbasnet
Hello all,

My task to cluster the stream of points around the centroids, I am using DataStreamUtils to collect the stream and pass it on to the map function to perform the necessary action. Below is the code:

DataStream<Point> points = newDataStream.map(new getPoints());
DataStream<Centroid> centroids = newCentroidDataStream.map(new TupleCentroidConverter());

Iterator<Centroid> iter = DataStreamUtils.collect(centroids);
Collection<Centroid> collectionCentroids = Lists.newArrayList(iter);
DataStream<Centroid> newCentroids = points.map(new SelectNearestCenter(collectionCentroids))
.map(new CountAppender()).keyBy(0).reduce(new CentroidAccumulator()).map(new CentroidAverager());

Iterator<Centroid> iter1 = DataStreamUtils.collect(newCentroids);
Collection<Centroid> finalCentroidsCollection = Lists.newArrayList(iter1);
DataStream<Tuple2<String, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter(finalCentroidsCollection));
clusteredPoints.print();

public static final class SelectNearestCenter extends RichMapFunction<Point, Tuple2<String, Point>> {
private Collection<Centroid> centroids;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
public SelectNearestCenter(Collection<Centroid> centroids) {
this.centroids = centroids;
}
@Override
public Tuple2<String, Point> map(Point p) throws Exception {

double minDistance = Double.MAX_VALUE;
String closestCentroidId = "-1";
                        ..................
return new Tuple2<String, Point>(closestCentroidId, p);
}
}

Cases:
1. Waited for around 10mins, and the clusteredPoints got printed but with centroid id as '-1' for all the points. And the execution ends after a certain time, due to multiple execution since there is one already inside the datastreamutil. 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.. How to get rid of this exception. 

1> (-1, 121.86 121.87 121.8149 121.8149 60600.0)
4> (-1, 121.52 121.52 121.45 121.485 28800.0)
.........

2. Waited for around 10mins, the clusteredPoints got printed with the centroid id as desired shown below. The clusteredPoint also gets printed in the console in streaming manner.  And throws no exception at all. The streaming continues. 

1> (Wed Jul 20 16:45:01 CEST 2016, 121.555 121.56 121.53 121.5385 69300.0)
1> (Wed Jul 20 18:19:00 CEST 2016, 121.8699 121.89 121.86 121.86 25700.0)
3> (Wed Jul 20 16:41:59 CEST 2016, 121.415 121.47 121.41 121.4658 38400.0)
1> (Wed Jul 20 18:13:59 CEST 2016, 121.86 121.87 121.8149 121.8149 60600.0)
4> (Wed Jul 20 16:43:59 CEST 2016, 121.52 121.52 121.45 121.485 28800.0)
3> (Wed Jul 20 18:16:59 CEST 2016, 121.8716 121.92 121.85 121.9141 64500.0)
4> (Wed Jul 20 18:15:00 CEST 2016, 121.92 121.92 121.88 121.88 53500.0)
4> (Wed Jul 20 18:12:04 CEST 2016, 121.82 121.82 121.74 121.74 43600.0)
..............

3. The clusteredPoints is printed with the centroid id as desired in streaming manner. But after certain duration the exception same as in case 1 is thrown and the program ends abruptly. 

Why so much variation in result on executing the same code. Now, in case of centroid id as '-1' in case 1, I would not be able to perform operations later on as all the clusteredPoints have the same centroid id '-1' which should have been rather timestamp as shown in case 2. How could be the solution to this issue. 

Best Regards,
Subash Basnet