filter as termination condition

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

filter as termination condition

Pa Rö
hello,

i have define a filter for the termination condition by k-means.
if i run my app it always compute only one iteration.

i think the problem is here:
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
or maybe the filter function:
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

best regards,
paul

my full code here:

    public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration with termination condition
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }
Reply | Threaded
Open this post in threaded view
|

Re: filter as termination condition

Stephan Ewen
Termination happens if the "termination criterion" data set is empty.

Maybe your filter is too aggressive and filters out everything, or the join is wrong and nothing joins...

On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö <[hidden email]> wrote:
hello,

i have define a filter for the termination condition by k-means.
if i run my app it always compute only one iteration.

i think the problem is here:
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
or maybe the filter function:
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

best regards,
paul

my full code here:

    public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration with termination condition
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

Reply | Threaded
Open this post in threaded view
|

Re: filter as termination condition

Sachin Goel

It appears that you're returning true when the previous and current solution are the same. You should instead return false in that case, because this is when the iteration should terminate.
Further, instead of joining, it would be a good idea to broadcast the new solution to the old solution [or the other way around] and have some tolerance value instead of an exact equality check.

Cheers!
Sachin

-- Sachin Goel
Computer Science, IIT Delhi
m. +91-9871457685

On Jul 22, 2015 5:46 PM, "Stephan Ewen" <[hidden email]> wrote:
Termination happens if the "termination criterion" data set is empty.

Maybe your filter is too aggressive and filters out everything, or the join is wrong and nothing joins...

On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö <[hidden email]> wrote:
hello,

i have define a filter for the termination condition by k-means.
if i run my app it always compute only one iteration.

i think the problem is here:
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
or maybe the filter function:
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

best regards,
paul

my full code here:

    public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration with termination condition
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

Reply | Threaded
Open this post in threaded view
|

Re: filter as termination condition

Till Rohrmann

Sachin is right that the filter has to be inverted. Furthermore, the join operation is not right here. You have to do a kind of a left outer join where you only keep the elements which join with NULL. Here is an example of how one could do it [1].

Cheers,
Till

[1] http://stackoverflow.com/questions/31558326/apache-flink-filter-as-termination-condition/31559947#31559947


On Wed, Jul 22, 2015 at 2:23 PM, Sachin Goel <[hidden email]> wrote:

It appears that you're returning true when the previous and current solution are the same. You should instead return false in that case, because this is when the iteration should terminate.
Further, instead of joining, it would be a good idea to broadcast the new solution to the old solution [or the other way around] and have some tolerance value instead of an exact equality check.

Cheers!
Sachin

-- Sachin Goel
Computer Science, IIT Delhi
m. <a href="tel:%2B91-9871457685" value="+919871457685" target="_blank">+91-9871457685

On Jul 22, 2015 5:46 PM, "Stephan Ewen" <[hidden email]> wrote:
Termination happens if the "termination criterion" data set is empty.

Maybe your filter is too aggressive and filters out everything, or the join is wrong and nothing joins...

On Tue, Jul 21, 2015 at 5:05 PM, Pa Rö <[hidden email]> wrote:
hello,

i have define a filter for the termination condition by k-means.
if i run my app it always compute only one iteration.

i think the problem is here:
DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
or maybe the filter function:
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }

best regards,
paul

my full code here:

    public void run() {   
        //load properties
        Properties pro = new Properties();
        FileSystem fs = null;
        try {
            pro.load(FlinkMain.class.getResourceAsStream("/config.properties"));
            fs = FileSystem.get(new URI(pro.getProperty("hdfs.namenode")),new org.apache.hadoop.conf.Configuration());
        } catch (Exception e) {
            e.printStackTrace();
        }
       
        int maxIteration = Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = fs.getHomeDirectory()+pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = null;
        try {
            centroids = getCentroidDataSet(env);
        } catch (Exception e1) {
            e1.printStackTrace();
        }
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter(this.getBenchmarkCounter())).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduceGroup(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager(this.getBenchmarkCounter()));
        // feed new centroids back into next iteration with termination condition
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids, newCentroids.join(loop).where("*").equalTo("*").filter(new MyFilter()));
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter(-1)).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    public static final class MyFilter implements FilterFunction<Tuple2<GeoTimeDataCenter, GeoTimeDataCenter>> {
       
        private static final long serialVersionUID = 5868635346889117617L;

        public boolean filter(Tuple2<GeoTimeDataCenter, GeoTimeDataCenter> tuple) throws Exception {
            if(tuple.f0.equals(tuple.f1)) {
                return true;
            }
            else {
                return false;
            }
        }
    }