loop break operation

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

loop break operation

Pa Rö
hello community,

i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:

public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break the loop}???

best regards,
paul
Reply | Threaded
Open this post in threaded view
|

Re: loop break operation

Sachin Goel
Hi
You can use iterateWithTermination to terminate before max iterations. The feedback for iteration then would be (next solution, isConverged) where isConverged is an empty data set if you wish to terminate.
However, this is something I have a pull request for: https://github.com/apache/flink/pull/918. Take a look.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <[hidden email]> wrote:
hello community,

i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:

public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break the loop}???

best regards,
paul

Reply | Threaded
Open this post in threaded view
|

Re: loop break operation

Pa Rö
i not found the "iterateWithTermination" function, only "iterate" and "iterateDelta". i use flink 0.9.0 with java.

2015-07-20 11:30 GMT+02:00 Sachin Goel <[hidden email]>:
Hi
You can use iterateWithTermination to terminate before max iterations. The feedback for iteration then would be (next solution, isConverged) where isConverged is an empty data set if you wish to terminate.
However, this is something I have a pull request for: https://github.com/apache/flink/pull/918. Take a look.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <[hidden email]> wrote:
hello community,

i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:

public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break the loop}???

best regards,
paul


Reply | Threaded
Open this post in threaded view
|

Re: loop break operation

Sachin Goel

Gah. Sorry.
In the closeWith call, give a second argument which determines if the iteration should be stopped.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Jul 20, 2015 3:21 PM, "Pa Rö" <[hidden email]> wrote:
i not found the "iterateWithTermination" function, only "iterate" and "iterateDelta". i use flink 0.9.0 with java.

2015-07-20 11:30 GMT+02:00 Sachin Goel <[hidden email]>:
Hi
You can use iterateWithTermination to terminate before max iterations. The feedback for iteration then would be (next solution, isConverged) where isConverged is an empty data set if you wish to terminate.
However, this is something I have a pull request for: https://github.com/apache/flink/pull/918. Take a look.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <[hidden email]> wrote:
hello community,

i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:

public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break the loop}???

best regards,
paul


Reply | Threaded
Open this post in threaded view
|

Re: loop break operation

Pa Rö
okay, i have found it. how to compare my old and new centers?

2015-07-20 12:16 GMT+02:00 Sachin Goel <[hidden email]>:

Gah. Sorry.
In the closeWith call, give a second argument which determines if the iteration should be stopped.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Jul 20, 2015 3:21 PM, "Pa Rö" <[hidden email]> wrote:
i not found the "iterateWithTermination" function, only "iterate" and "iterateDelta". i use flink 0.9.0 with java.

2015-07-20 11:30 GMT+02:00 Sachin Goel <[hidden email]>:
Hi
You can use iterateWithTermination to terminate before max iterations. The feedback for iteration then would be (next solution, isConverged) where isConverged is an empty data set if you wish to terminate.
However, this is something I have a pull request for: https://github.com/apache/flink/pull/918. Take a look.

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <[hidden email]> wrote:
hello community,

i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:

public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break the loop}???

best regards,
paul



Reply | Threaded
Open this post in threaded view
|

Re: loop break operation

Fabian Hueske-2

Use a broadcastset to distribute the old centers to a map which has the new centers as regular input. Put the old centers in a hashmap in open() and check the distance to the new centers in map().

On Jul 20, 2015 12:55 PM, "Pa Rö" <[hidden email]> wrote:
okay, i have found it. how to compare my old and new centers?

2015-07-20 12:16 GMT+02:00 Sachin Goel <[hidden email]>:

Gah. Sorry.
In the closeWith call, give a second argument which determines if the iteration should be stopped.

-- Sachin Goel
Computer Science, IIT Delhi
m. <a href="tel:%2B91-9871457685" value="+919871457685" target="_blank">+91-9871457685

On Jul 20, 2015 3:21 PM, "Pa Rö" <[hidden email]> wrote:
i not found the "iterateWithTermination" function, only "iterate" and "iterateDelta". i use flink 0.9.0 with java.

2015-07-20 11:30 GMT+02:00 Sachin Goel <[hidden email]>:
Hi
You can use iterateWithTermination to terminate before max iterations. The feedback for iteration then would be (next solution, isConverged) where isConverged is an empty data set if you wish to terminate.
However, this is something I have a pull request for: https://github.com/apache/flink/pull/918. Take a look.

-- Sachin Goel
Computer Science, IIT Delhi
m. <a href="tel:%2B91-9871457685" value="+919871457685" target="_blank">+91-9871457685

On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <[hidden email]> wrote:
hello community,

i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:

public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break the loop}???

best regards,
paul



Reply | Threaded
Open this post in threaded view
|

Re: loop break operation

Pa Rö
i try the following, but it take always maxIterations, maybe someone can give me a review?

    private int benchmarkCounter;
    private static int iterationCounter = 1;
    private static DataSet<GeoTimeDataCenter> oldCentroids;

   
    FlinkMain(int benchmarkCounter) {
        this.benchmarkCounter = benchmarkCounter;
    }
   
    public int getBenchmarkCounter() {
        return benchmarkCounter;
    }
       
    public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // check if centers been updated
        DataSet<GeoTimeDataCenter> checkCentroids = null;
        if(this.getBenchmarkCounter()==0) {
            oldCentroids = newCentroids;
        }
        else {
            try {
                HashMap<Integer, GeoTimeDataTupel> oldMap = new HashMap<Integer, GeoTimeDataTupel>();
                for (GeoTimeDataCenter i : oldCentroids.collect()) oldMap.put(i.getId(),new GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT()));
                HashMap<Integer, GeoTimeDataTupel> newMap = new HashMap<Integer, GeoTimeDataTupel>();
                for (GeoTimeDataCenter i : newCentroids.collect()) oldMap.put(i.getId(),new GeoTimeDataTupel(i.getGeo(),i.getTime(),i.getidGDELT()));
                if(!GeoTimeDataHelper.compareCentersMaps(oldMap, newMap)) {
                    checkCentroids = newCentroids;
                }
                else {
                    checkCentroids = null;
                }
                oldCentroids = newCentroids;
            }
            catch(Exception e) {
                e.printStackTrace();
            }
           
        }
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, checkCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2015-07-20 12:58 GMT+02:00 Fabian Hueske <[hidden email]>:

Use a broadcastset to distribute the old centers to a map which has the new centers as regular input. Put the old centers in a hashmap in open() and check the distance to the new centers in map().

On Jul 20, 2015 12:55 PM, "Pa Rö" <[hidden email]> wrote:
okay, i have found it. how to compare my old and new centers?

2015-07-20 12:16 GMT+02:00 Sachin Goel <[hidden email]>:

Gah. Sorry.
In the closeWith call, give a second argument which determines if the iteration should be stopped.

-- Sachin Goel
Computer Science, IIT Delhi
m. <a href="tel:%2B91-9871457685" value="+919871457685" target="_blank">+91-9871457685

On Jul 20, 2015 3:21 PM, "Pa Rö" <[hidden email]> wrote:
i not found the "iterateWithTermination" function, only "iterate" and "iterateDelta". i use flink 0.9.0 with java.

2015-07-20 11:30 GMT+02:00 Sachin Goel <[hidden email]>:
Hi
You can use iterateWithTermination to terminate before max iterations. The feedback for iteration then would be (next solution, isConverged) where isConverged is an empty data set if you wish to terminate.
However, this is something I have a pull request for: https://github.com/apache/flink/pull/918. Take a look.

-- Sachin Goel
Computer Science, IIT Delhi
m. <a href="tel:%2B91-9871457685" value="+919871457685" target="_blank">+91-9871457685

On Mon, Jul 20, 2015 at 2:55 PM, Pa Rö <[hidden email]> wrote:
hello community,

i have write a k-means app in flink, now i want change my terminate condition from max iteration to checking the changing of the cluster centers, but i don't know how i can break the flink loop. here my execution code of flink:

public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

is it possible to use a contruct like: if(centroids equals points){break the loop}???

best regards,
paul