hello, i have define a filter for the termination condition by k-means.i think the problem is here: DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter())); or maybe the filter function: public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> { private static final long serialVersionUID = 5868635346889117617L; public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception { if(tuple.f0.equals(tuple.f1)) { return true; } else { return false; } } } best regards, paul public void run() { //load properties Properties pro = new Properties(); FileSystem fs = null; try { pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration()); } catch (Exception e) { e.printStackTrace(); } int maxIteration = Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = null; try { centroids = getCentroidDataSet(env); } catch (Exception e1) { e1.printStackTrace(); } // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduceGroup(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager(this.getBenchmarkCounter())); // feed new centroids back into next iteration with termination condition DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter())); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> { private static final long serialVersionUID = 5868635346889117617L; public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception { if(tuple.f0.equals(tuple.f1)) { return true; } else { return false; } } } |
Termination happens if the "termination criterion" data set is empty. Maybe your filter is too aggressive and filters out everything, or the join is wrong and nothing joins... On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö <[hidden email]> wrote:
|
It appears that you're returning true when the previous and current solution are the same. You should instead return false in that case, because this is when the iteration should terminate. Cheers! -- Sachin Goel On Jul 22, 2015 5:46 PM, "Stephan Ewen" <[hidden email]> wrote:
|
Sachin is right that the filter has to be inverted. Furthermore, the Cheers, On Wed, Jul 22, 2015 at 2:23 PM, Sachin Goel <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |