hello community, i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:public void run() { //load properties Properties pro = new Properties(); FileSystem fs = null; try { pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration()); } catch (Exception e) { e.printStackTrace(); } int maxIteration = Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = null; try { centroids = getCentroidDataSet(env); } catch (Exception e1) { e1.printStackTrace(); } // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduceGroup(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager(this.getBenchmarkCounter())); // feed new centroids back into next iteration DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } best regards, paul |
Hi You can use iterateWithTermination to terminate before max iterations. The feedback for iteration then would be (next solution, isConverged) where isConverged is an empty data set if you wish to terminate. However, this is something I have a pull request for: https://github.com/apache/flink/pull/918. Take a look. -- Sachin Goel Computer Science, IIT Delhi m. +91-9871457685 On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <[hidden email]> wrote:
|
i not found the "iterateWithTermination" function, only "iterate" and "iterateDelta". i use flink 0.9.0 with java. 2015-07-20 11:30 GMT+02:00 Sachin Goel <[hidden email]>:
|
Gah. Sorry. -- Sachin Goel On Jul 20, 2015 3:21 PM, "Pa Rö" <[hidden email]> wrote:
|
okay, i have found it. how to compare my old and new centers? 2015-07-20 12:16 GMT+02:00 Sachin Goel <[hidden email]>:
|
Use a broadcastset to distribute the old centers to a map which has the new centers as regular input. Put the old centers in a hashmap in open() and check the distance to the new centers in map(). On Jul 20, 2015 12:55 PM, "Pa Rö" <[hidden email]> wrote:
|
i try the following, but it take always maxIterations, maybe someone can give me a review? private int benchmarkCounter; private static int iterationCounter = 1; private static DataSet<GeoTimeDataCenter> oldCentroids; FlinkMain(int benchmarkCounter) { this.benchmarkCounter = benchmarkCounter; } public int getBenchmarkCounter() { return benchmarkCounter; } public void run() { //load properties Properties pro = new Properties(); FileSystem fs = null; try { pro.load(FlinkMain.class.getResourceAsStream("/config.properties")); fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration()); } catch (Exception e) { e.printStackTrace(); } int maxIteration = Integer.parseInt(pro.getProperty("maxiterations")); String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output"); // set up execution environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // get input points DataSet<GeoTimeDataTupel> points = getPointDataSet(env); DataSet<GeoTimeDataCenter> centroids = null; try { centroids = getCentroidDataSet(env); } catch (Exception e1) { e1.printStackTrace(); } // set number of bulk iterations for KMeans algorithm IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration); DataSet<GeoTimeDataCenter> newCentroids = points // compute closest centroid for each point .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids") // count and sum point coordinates for each centroid .groupBy(0).reduceGroup(new CentroidAccumulator()) // compute new centroids from point counts and coordinate sums .map(new CentroidAverager(this.getBenchmarkCounter())); // check if centers been updated DataSet<GeoTimeDataCenter> checkCentroids = null; if(this.getBenchmarkCounter()==0) { oldCentroids = newCentroids; } else { try { HashMap<Integer, GeoTimeDataTupel> oldMap = new HashMap<Integer, GeoTimeDataTupel>(); for (GeoTimeDataCenter i : oldCentroids.collect()) oldMap.put(i.getId(),new GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT())); HashMap<Integer, GeoTimeDataTupel> newMap = new HashMap<Integer, GeoTimeDataTupel>(); for (GeoTimeDataCenter i : newCentroids.collect()) oldMap.put(i.getId(),new GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT())); if(!GeoTimeDataHelper.compareCentersMaps(oldMap, newMap)) { checkCentroids = newCentroids; } else { checkCentroids = null; } oldCentroids = newCentroids; } catch(Exception e) { e.printStackTrace(); } } // feed new centroids back into next iteration DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, checkCentroids); DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids"); // emit result clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " "); finalCentroids.writeAsText(outputPath+"/centers");//print(); // execute program try { env.execute("KMeans Flink"); } catch (Exception e) { e.printStackTrace(); } } 2015-07-20 12:58 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Free forum by Nabble | Edit this page |