k means - waiting for dataset

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

k means - waiting for dataset

Pa Rö
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul
Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Till Rohrmann
Hi Paul,

could you share your code with us so that we see whether there is any error.

Does this error also occurs with 0.9-SNAPSHOT?

Cheers,
Till

Che

On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul

Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Pa Rö
hi,
the exception came with version 0.9.
with version 0.8.1 came no exception, but the results are foobar.

here my main:

public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 2;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
       
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

2015-05-21 11:28 GMT+02:00 Till Rohrmann <[hidden email]>:
Hi Paul,

could you share your code with us so that we see whether there is any error.

Does this error also occurs with 0.9-SNAPSHOT?

Cheers,
Till

Che

On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul


Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Stephan Ewen
In reply to this post by Pa Rö
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul

Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Till Rohrmann
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul


Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Pa Rö
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {
   
    public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
       
        private static final long serialVersionUID = -2729445046389350264L;
        private Collection<GeoTimeDataCenter> centroids;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
        }
       
        @Override
        public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId= -1;
           
            // check all cluster centers
            for(GeoTimeDataCenter centroid : centroids) {
                // compute distance
                double distance = Distance.ComputeDist(point, centroid);
                // update nearest cluster if necessary
                if(distance < minDistance) {
                    minDistance = distance;
                    closestCentroidId = centroid.getId();
                }
            }
            // emit a new record with the center id and the data point
            return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
        }
    }
   
    // sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }
   
    // computes new centroid from coordinate sum and count of points
    private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

        private static final long serialVersionUID = -2687234478847261803L;

        public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
            return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
        }
    }

    private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            //.lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false)
            //.includeFields(true,true,true,true)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }
   
    private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
       
        private static final long serialVersionUID = 3485560278562719538L;

        public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
            return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
        }
    }

    private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String seedFile = pro.getProperty("seed.file");
        boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag"));
        // get points from file or random
        if(seedFlag || !(new File(seedFile+"-1").exists())) {
            Seeding.randomSeeding();
        }
        // map csv file
        return env.readCsvFile(seedFile+"-1")
            .lineDelimiter("\n")
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            .includeFields(true, true, true, true)
            .types(Integer.class, Double.class, Double.class, Long.class)
            .map(new TupleCentroidConverter());
    }
   
    private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

        private static final long serialVersionUID = -1046538744363026794L;

        public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
            return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
        }
    }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul



Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Pa Rö
i have fix a bug at the input reading, but the results are still different.

i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...

the method is the following:

// sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa Rö <[hidden email]>:
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {

   
    public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
       
        private static final long serialVersionUID = -2729445046389350264L;
        private Collection<GeoTimeDataCenter> centroids;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
        }
       
        @Override
        public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId= -1;
           
            // check all cluster centers
            for(GeoTimeDataCenter centroid : centroids) {
                // compute distance
                double distance = Distance.ComputeDist(point, centroid);
                // update nearest cluster if necessary
                if(distance < minDistance) {
                    minDistance = distance;
                    closestCentroidId = centroid.getId();
                }
            }
            // emit a new record with the center id and the data point
            return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
        }
    }
   
    // sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }
   
    // computes new centroid from coordinate sum and count of points
    private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

        private static final long serialVersionUID = -2687234478847261803L;

        public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
            return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
        }
    }

    private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            //.lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false)
            //.includeFields(true,true,true,true)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }
   
    private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
       
        private static final long serialVersionUID = 3485560278562719538L;

        public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
            return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
        }
    }

    private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String seedFile = pro.getProperty("seed.file");
        boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag"));
        // get points from file or random
        if(seedFlag || !(new File(seedFile+"-1").exists())) {
            Seeding.randomSeeding();
        }
        // map csv file
        return env.readCsvFile(seedFile+"-1")
            .lineDelimiter("\n")
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            .includeFields(true, true, true, true)
            .types(Integer.class, Double.class, Double.class, Long.class)
            .map(new TupleCentroidConverter());
    }
   
    private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

        private static final long serialVersionUID = -1046538744363026794L;

        public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
            return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
        }
    }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul




Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Stephan Ewen
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö <[hidden email]> wrote:
i have fix a bug at the input reading, but the results are still different.

i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...

the method is the following:

// sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa Rö <[hidden email]>:
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {

   
    public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
       
        private static final long serialVersionUID = -2729445046389350264L;
        private Collection<GeoTimeDataCenter> centroids;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
        }
       
        @Override
        public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId= -1;
           
            // check all cluster centers
            for(GeoTimeDataCenter centroid : centroids) {
                // compute distance
                double distance = Distance.ComputeDist(point, centroid);
                // update nearest cluster if necessary
                if(distance < minDistance) {
                    minDistance = distance;
                    closestCentroidId = centroid.getId();
                }
            }
            // emit a new record with the center id and the data point
            return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
        }
    }
   
    // sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }
   
    // computes new centroid from coordinate sum and count of points
    private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

        private static final long serialVersionUID = -2687234478847261803L;

        public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
            return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
        }
    }

    private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            //.lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false)
            //.includeFields(true,true,true,true)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }
   
    private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
       
        private static final long serialVersionUID = 3485560278562719538L;

        public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
            return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
        }
    }

    private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String seedFile = pro.getProperty("seed.file");
        boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag"));
        // get points from file or random
        if(seedFlag || !(new File(seedFile+"-1").exists())) {
            Seeding.randomSeeding();
        }
        // map csv file
        return env.readCsvFile(seedFile+"-1")
            .lineDelimiter("\n")
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            .includeFields(true, true, true, true)
            .types(Integer.class, Double.class, Double.class, Long.class)
            .map(new TupleCentroidConverter());
    }
   
    private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

        private static final long serialVersionUID = -1046538744363026794L;

        public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
            return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
        }
    }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul





Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Pa Rö
good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and form the new center.
in my other implementation firstly i sum all point and at the end i divides by number of points.
to example: (1+2+3+4)/4=2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> (2,25+4)=3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul


Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö <[hidden email]> wrote:
i have fix a bug at the input reading, but the results are still different.

i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...

the method is the following:

// sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa Rö <[hidden email]>:
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {

   
    public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
       
        private static final long serialVersionUID = -2729445046389350264L;
        private Collection<GeoTimeDataCenter> centroids;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
        }
       
        @Override
        public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId= -1;
           
            // check all cluster centers
            for(GeoTimeDataCenter centroid : centroids) {
                // compute distance
                double distance = Distance.ComputeDist(point, centroid);
                // update nearest cluster if necessary
                if(distance < minDistance) {
                    minDistance = distance;
                    closestCentroidId = centroid.getId();
                }
            }
            // emit a new record with the center id and the data point
            return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
        }
    }
   
    // sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }
   
    // computes new centroid from coordinate sum and count of points
    private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

        private static final long serialVersionUID = -2687234478847261803L;

        public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
            return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
        }
    }

    private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            //.lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false)
            //.includeFields(true,true,true,true)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }
   
    private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
       
        private static final long serialVersionUID = 3485560278562719538L;

        public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
            return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
        }
    }

    private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String seedFile = pro.getProperty("seed.file");
        boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag"));
        // get points from file or random
        if(seedFlag || !(new File(seedFile+"-1").exists())) {
            Seeding.randomSeeding();
        }
        // map csv file
        return env.readCsvFile(seedFile+"-1")
            .lineDelimiter("\n")
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            .includeFields(true, true, true, true)
            .types(Integer.class, Double.class, Double.class, Long.class)
            .map(new TupleCentroidConverter());
    }
   
    private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

        private static final long serialVersionUID = -1046538744363026794L;

        public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
            return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
        }
    }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul






Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Fabian Hueske-2
There are two ways to do that:

1) You use a GroupReduceFunction, which gives you an iterator over all points similar to Hadoop's ReduceFunction.
2) You use the ReduceFunction to compute the sum and the count at the same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the final division.

I'd go with the first choice. It's easier.

Best, Fabian

2015-05-22 23:09 GMT+02:00 Paul Röwer <[hidden email]>:
good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and form the new center.
in my other implementation firstly i sum all point and at the end i divides by number of points.
to example: (1+2+3+4)/4=2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> (2,25+4)=3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul



Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö <[hidden email]> wrote:
i have fix a bug at the input reading, but the results are still different.

i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...

the method is the following:

// sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa Rö <[hidden email]>:
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {

   
    public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
       
        private static final long serialVersionUID = -2729445046389350264L;
        private Collection<GeoTimeDataCenter> centroids;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
        }
       
        @Override
        public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId= -1;
           
            // check all cluster centers
            for(GeoTimeDataCenter centroid : centroids) {
                // compute distance
                double distance = Distance.ComputeDist(point, centroid);
                // update nearest cluster if necessary
                if(distance < minDistance) {
                    minDistance = distance;
                    closestCentroidId = centroid.getId();
                }
            }
            // emit a new record with the center id and the data point
            return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
        }
    }
   
    // sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }
   
    // computes new centroid from coordinate sum and count of points
    private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

        private static final long serialVersionUID = -2687234478847261803L;

        public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
            return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
        }
    }

    private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            //.lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false)
            //.includeFields(true,true,true,true)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }
   
    private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
       
        private static final long serialVersionUID = 3485560278562719538L;

        public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
            return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
        }
    }

    private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String seedFile = pro.getProperty("seed.file");
        boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag"));
        // get points from file or random
        if(seedFlag || !(new File(seedFile+"-1").exists())) {
            Seeding.randomSeeding();
        }
        // map csv file
        return env.readCsvFile(seedFile+"-1")
            .lineDelimiter("\n")
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            .includeFields(true, true, true, true)
            .types(Integer.class, Double.class, Double.class, Long.class)
            .map(new TupleCentroidConverter());
    }
   
    private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

        private static final long serialVersionUID = -1046538744363026794L;

        public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
            return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
        }
    }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul







Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Pa Rö
thanks for your message,

maybe you can give me a exsample for the GroupReduceFunction?

2015-05-22 23:29 GMT+02:00 Fabian Hueske <[hidden email]>:
There are two ways to do that:

1) You use a GroupReduceFunction, which gives you an iterator over all points similar to Hadoop's ReduceFunction.
2) You use the ReduceFunction to compute the sum and the count at the same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the final division.

I'd go with the first choice. It's easier.

Best, Fabian

2015-05-22 23:09 GMT+02:00 Paul Röwer <[hidden email]>:
good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and form the new center.
in my other implementation firstly i sum all point and at the end i divides by number of points.
to example: (1+2+3+4)/4=2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> (2,25+4)=3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul



Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö <[hidden email]> wrote:
i have fix a bug at the input reading, but the results are still different.

i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...

the method is the following:

// sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa Rö <[hidden email]>:
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {

   
    public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
       
        private static final long serialVersionUID = -2729445046389350264L;
        private Collection<GeoTimeDataCenter> centroids;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
        }
       
        @Override
        public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId= -1;
           
            // check all cluster centers
            for(GeoTimeDataCenter centroid : centroids) {
                // compute distance
                double distance = Distance.ComputeDist(point, centroid);
                // update nearest cluster if necessary
                if(distance < minDistance) {
                    minDistance = distance;
                    closestCentroidId = centroid.getId();
                }
            }
            // emit a new record with the center id and the data point
            return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
        }
    }
   
    // sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }
   
    // computes new centroid from coordinate sum and count of points
    private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

        private static final long serialVersionUID = -2687234478847261803L;

        public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
            return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
        }
    }

    private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            //.lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false)
            //.includeFields(true,true,true,true)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }
   
    private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
       
        private static final long serialVersionUID = 3485560278562719538L;

        public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
            return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
        }
    }

    private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String seedFile = pro.getProperty("seed.file");
        boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag"));
        // get points from file or random
        if(seedFlag || !(new File(seedFile+"-1").exists())) {
            Seeding.randomSeeding();
        }
        // map csv file
        return env.readCsvFile(seedFile+"-1")
            .lineDelimiter("\n")
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            .includeFields(true, true, true, true)
            .types(Integer.class, Double.class, Double.class, Long.class)
            .map(new TupleCentroidConverter());
    }
   
    private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

        private static final long serialVersionUID = -1046538744363026794L;

        public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
            return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
        }
    }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul








Reply | Threaded
Open this post in threaded view
|

Re: k means - waiting for dataset

Fabian Hueske-2
Sure,

here some pseudo code:

public class CentroidMover extends GroupReduceFunction<Point, Centroid> {

  public void reduce(Iterable<Point> points, Collector<Centroid> out) {
    int cnt = 0;
    Centroid sum = new Centroid(0,0);
    for(Point p : points) {
      sum = sum + p // (your addition logic goes here)
      cnt++;
    }
    out.collect(sum / cnt); // your division logic goes here.
}

This function computes the sum and the count of a group and the final average.

Is this what you are looking for?

2015-05-26 11:34 GMT+02:00 Pa Rö <[hidden email]>:
thanks for your message,

maybe you can give me a exsample for the GroupReduceFunction?

2015-05-22 23:29 GMT+02:00 Fabian Hueske <[hidden email]>:
There are two ways to do that:

1) You use a GroupReduceFunction, which gives you an iterator over all points similar to Hadoop's ReduceFunction.
2) You use the ReduceFunction to compute the sum and the count at the same time (e.g., in two fields of a Tuple2) and use a MapFunction to do the final division.

I'd go with the first choice. It's easier.

Best, Fabian

2015-05-22 23:09 GMT+02:00 Paul Röwer <[hidden email]>:
good evening,

sorry, my english is not the best.

by comupte the new centroid, i will sum all points of the cluster and form the new center.
in my other implementation firstly i sum all point and at the end i divides by number of points.
to example: (1+2+3+4)/4=2,5

at flink i reduce always two point to one,
for the example upstairs: (1+2)/2=1,5 --> (1,5+3)/2=2,25 --> (2,25+4)=3,125

how can i rewrite my function so, that it work like my other implementation?

best regards,
paul



Am 22.05.2015 um 16:52 schrieb Stephan Ewen:
Sorry, I don't understand the question.

Can you describe a bit better what you mean with "how i can sum all points and share thoug the counter" ?

Thanks!

On Fri, May 22, 2015 at 2:06 PM, Pa Rö <[hidden email]> wrote:
i have fix a bug at the input reading, but the results are still different.

i think i have local the problem, in the other implementation i sum all geo points/time points and share thougt the counter.
but in flink i sum two points and share thougt two, and sum the next...

the method is the following:

// sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f0,val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(int clusterid,GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }

how i can sum all points and share thoug the counter?


2015-05-22 9:53 GMT+02:00 Pa Rö <[hidden email]>:
hi,
if i print the centroids all are show in the output. i have implement k means with map reduce und spark. by same input, i get the same output. but in flink i get a one cluster output with this input set. (i use csv files from the GDELT projekt)

here my class:

public class FlinkMain {

   
    public static void main(String[] args) {
        //load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        int maxIteration = 1;//Integer.parseInt(pro.getProperty("maxiterations"));
        String outputPath = pro.getProperty("flink.output");
        // set up execution environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // get input points
        DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
        DataSet<GeoTimeDataCenter> centroids = getCentroidDataSet(env);
        // set number of bulk iterations for KMeans algorithm
        IterativeDataSet<GeoTimeDataCenter> loop = centroids.iterate(maxIteration);
        DataSet<GeoTimeDataCenter> newCentroids = points
            // compute closest centroid for each point
            .map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
            // count and sum point coordinates for each centroid
            .groupBy(0).reduce(new CentroidAccumulator())
            // compute new centroids from point counts and coordinate sums
            .map(new CentroidAverager());
        // feed new centroids back into next iteration
        DataSet<GeoTimeDataCenter> finalCentroids = loop.closeWith(newCentroids);
        DataSet<Tuple2<Integer, GeoTimeDataTupel>> clusteredPoints = points
            // assign points to final clusters
            .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        // emit result
        clusteredPoints.writeAsCsv(outputPath+"/points", "\n", " ");
        finalCentroids.writeAsText(outputPath+"/centers");//print();
        // execute program
        try {
            env.execute("KMeans Flink");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
   
    private static final class SelectNearestCenter extends RichMapFunction<GeoTimeDataTupel,Tuple2<Integer,GeoTimeDataTupel>> {
       
        private static final long serialVersionUID = -2729445046389350264L;
        private Collection<GeoTimeDataCenter> centroids;
       
        @Override
        public void open(Configuration parameters) throws Exception {
            this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
        }
       
        @Override
        public Tuple2<Integer, GeoTimeDataTupel> map(GeoTimeDataTupel point) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId= -1;
           
            // check all cluster centers
            for(GeoTimeDataCenter centroid : centroids) {
                // compute distance
                double distance = Distance.ComputeDist(point, centroid);
                // update nearest cluster if necessary
                if(distance < minDistance) {
                    minDistance = distance;
                    closestCentroidId = centroid.getId();
                }
            }
            // emit a new record with the center id and the data point
            return new Tuple2<Integer, GeoTimeDataTupel>(closestCentroidId, point);
        }
    }
   
    // sums and counts point coordinates
    private static final class CentroidAccumulator implements ReduceFunction<Tuple2<Integer, GeoTimeDataTupel>> {

        private static final long serialVersionUID = -4868797820391121771L;
       
        public Tuple2<Integer, GeoTimeDataTupel> reduce(Tuple2<Integer, GeoTimeDataTupel> val1, Tuple2<Integer, GeoTimeDataTupel> val2) {
            return new Tuple2<Integer, GeoTimeDataTupel>(val1.f0, addAndDiv(val1.f1,val2.f1));
        }
    }
   
    private static GeoTimeDataTupel addAndDiv(GeoTimeDataTupel input1, GeoTimeDataTupel input2){
        long time = (input1.getTime()+input2.getTime())/2;
        List<LatLongSeriable> list = new ArrayList<LatLongSeriable>();
        list.add(input1.getGeo());
        list.add(input2.getGeo());
        LatLongSeriable geo = Geometry.getGeoCenterOf(list);

        return new GeoTimeDataTupel(geo,time,"POINT");
    }
   
    // computes new centroid from coordinate sum and count of points
    private static final class CentroidAverager implements MapFunction<Tuple2<Integer, GeoTimeDataTupel>, GeoTimeDataCenter> {

        private static final long serialVersionUID = -2687234478847261803L;

        public GeoTimeDataCenter map(Tuple2<Integer, GeoTimeDataTupel> value) {
            return new GeoTimeDataCenter(value.f0, value.f1.getGeo(),value.f1.getTime());
        }
    }

    private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String inputFile = pro.getProperty("input");
        // map csv file
        return env.readCsvFile(inputFile)
            .ignoreInvalidLines()
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            //.lineDelimiter("\n")
            .includeFields(true, true, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false, true, true
                    , false, false, false, false, false, false, false, false, false, false
                    , false, false, false, false, false, false, false, false)
            //.includeFields(true,true,true,true)
            .types(String.class, Long.class, Double.class, Double.class)
            .map(new TuplePointConverter());
    }
   
    private static final class TuplePointConverter implements MapFunction<Tuple4<String, Long, Double, Double>, GeoTimeDataTupel>{
       
        private static final long serialVersionUID = 3485560278562719538L;

        public GeoTimeDataTupel map(Tuple4<String, Long, Double, Double> t) throws Exception {
            return new GeoTimeDataTupel(new LatLongSeriable(t.f2, t.f3), t.f1, t.f0);
        }
    }

    private static DataSet<GeoTimeDataCenter> getCentroidDataSet(ExecutionEnvironment env) {
        // load properties
        Properties pro = new Properties();
        try {
            pro.load(new FileInputStream("./resources/config.properties"));
        } catch (Exception e) {
            e.printStackTrace();
        }
        String seedFile = pro.getProperty("seed.file");
        boolean seedFlag = Boolean.parseBoolean(pro.getProperty("seed.flag"));
        // get points from file or random
        if(seedFlag || !(new File(seedFile+"-1").exists())) {
            Seeding.randomSeeding();
        }
        // map csv file
        return env.readCsvFile(seedFile+"-1")
            .lineDelimiter("\n")
            .fieldDelimiter('\u0009')
            //.fieldDelimiter("\t")
            .includeFields(true, true, true, true)
            .types(Integer.class, Double.class, Double.class, Long.class)
            .map(new TupleCentroidConverter());
    }
   
    private static final class TupleCentroidConverter implements MapFunction<Tuple4<Integer, Double, Double, Long>, GeoTimeDataCenter>{

        private static final long serialVersionUID = -1046538744363026794L;

        public GeoTimeDataCenter map(Tuple4<Integer, Double, Double, Long> t) throws Exception {
            return new GeoTimeDataCenter(t.f0,new LatLongSeriable(t.f1, t.f2), t.f3);
        }
    }
}

2015-05-21 14:22 GMT+02:00 Till Rohrmann <[hidden email]>:
Concerning your first problem that you only see one resulting centroid, your code looks good modulo the parts you haven't posted. 

However, your problem could simply be caused by a bad selection of initial centroids. If, for example, all centroids except for one don't get any points assigned, then only one centroid will survive the iteration step. How do you do it? 

To check that all centroids are read you can print the contents of the centroids DataSet. Furthermore, you can simply println the new centroids after each iteration step. In local mode you can then observe the computation.

Cheers,
Till

On Thu, May 21, 2015 at 12:23 PM, Stephan Ewen <[hidden email]> wrote:
Hi!

This problem should not depend on any user code. There are no user-code dependent actors in Flink.

Is there more stack trace that you can send us? It looks like it misses the core exception that is causing the issue is not part of the stack trace.

Greetings,
Stephan



On Thu, May 21, 2015 at 11:11 AM, Pa Rö <[hidden email]> wrote:
hi flink community,

i have implement k-means for clustering temporal geo data. i use the following github project and my own data structure:
https://github.com/apache/flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/clustering/KMeans.java

not i have the problem, that flink read the centroids from file and work parallel futher. if i look at the results, i have the feeling, that the prgramm load only one centroid point.

i work with flink 0.8.1, if i update to 0.9 milestone 1 i get the following exception:
ERROR actor.OneForOneStrategy: exception during creation
akka.actor.ActorInitializationException: exception during creation
    at akka.actor.ActorInitializationException$.apply(Actor.scala:218)
    at akka.actor.ActorCell.create(ActorCell.scala:578)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:425)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at akka.util.Reflect$.instantiate(Reflect.scala:65)
    at akka.actor.Props.newActor(Props.scala:337)
    at akka.actor.ActorCell.newActor(ActorCell.scala:534)
    at akka.actor.ActorCell.create(ActorCell.scala:560)
    ... 9 more

how can i say flink, that it should be wait for loading dataset, and what say this exception?

best regards,
paul