Hello all, I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. The problem is I generate a list of tuples. My List is: List<Tuple3> finalElements = new ArrayList<Tuple3>(); Following is the datatype of the elements added to the list: Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>(); finalElements.add(newElement); Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, so that I could use fElements.writeAsCsv(outputPath, "\n"," "); Best Regards, Subash Basnet |
Assuming your EnvironmentContext is named `env` Simply call: DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements); Does this help? On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
BR, Stefano Baghino |
In reply to this post by subashbasnet
Hello Stefano, Yeah the type casting worked, thank you. But not able to print the Dataset to the file. The default below code which writes the KMeans points along with their centroid numbers to the file works fine: // feed new centroids back into next iteration DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, Point>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); if (fileOutput) { clusteredPoints.writeAsCsv(outputPath, "\n", " "); // since file sinks are lazy, we trigger the execution explicitly env.execute("KMeans Example"); } But my modified code below to find outlier: // feed new centroids back into next iteration DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids); DataSet<Tuple2<Integer, Point>> clusteredPoints = points // assign points to final clusters .map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids"); DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids)); if (fileOutput) { fElements.writeAsCsv(outputPath, "\n", " "); // since file sinks are lazy, we trigger the execution explicitly env.execute("KMeans Example"); } It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print(); Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case. Best Regards, Subash Basnet On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
|
Hi Subash, how is findOutliers implemented?It might be that you mix-up local and cluster computation. All DataSets are processed in the cluster. Please note the following: - ExecutionEnvironment.fromCollection() transforms a client local connection into a DataSet by serializing it and sending it to the cluster.So, does findOutliers operate on the cluster or on the local client, i.e., does it work with DataSet and send the result back as a collection or does it first collect the results as collection and operate on these? Best, Fabian 2016-02-10 12:13 GMT+01:00 subash basnet <[hidden email]>:
|
In reply to this post by subashbasnet
Hello Fabian,
I use the collect() method to get the elements locally and perform operations on that and return the result as a collection. The collection result is converted to the DataSet in the calling method. Below is the code of findOutliers method: public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> clusteredPoints, DataSet<Centroid> centroids) throws Exception { List<Tuple3> finalElements = new ArrayList<Tuple3>(); List<Tuple2<Integer, Point>> elements = clusteredPoints.collect(); List<Centroid> centroidList = centroids.collect(); List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>(); for (Centroid centroid : centroidList) { elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>(); double totalDistance = 0; int elementsCount = 0; for (Tuple2<Integer, Point> e : elements) { // compute distance if (e.f0 == centroid.id) { Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new Tuple3<Centroid, Tuple2<Integer, Point>, Double>(); double distance = e.f1.euclideanDistance(centroid); totalDistance += distance; newElement.setFields(centroid, e, distance); elementsWithDistance.add(newElement); elementsCount++; } } // finding mean double mean = totalDistance / elementsCount; double sdTotalDistanceSquare = 0; for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) { double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2); sdTotalDistanceSquare += distanceSquare; } double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount); double upperlimit = mean + 2 * sd; double lowerlimit = mean - 2 * sd; Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();// true // = // outlier for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) { newElement = new Tuple3<Integer, Point, Boolean>(); if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > upperlimit) { // set as outlier newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, true); } else { newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, false); } finalElements.add(newElement); } } return finalElements; } I have attached herewith the screenshot of my project structure and KMeansOutlierDetection.java file for more clarity. Best Regards, Subash Basnet On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
project_structure.png (491K) Download Attachment KMeansOutlierDetection.java (19K) Download Attachment |
Hi Subash, I would not fetch the data to the client, do the computation there, and send it back, just for the purpose of writing it to a file. Either 1) pull the results to the client and write the file from there or 2) compute the outliers in the cluster. I did not study your code completely, but the two nested loops and the condition are a join for example.I would go for option 2, if possible. Best, Fabian 2016-02-10 13:07 GMT+01:00 subash basnet <[hidden email]>:
|
Hello Fabian, As written before code: DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids)); fElements.writeAsCsv(outputPath, "\n", " "); env.execute("KMeans Example"); I am very new to flink so not so clear about what you suggested, by option(1) you meant that I write my own FileWriter here rather than using writeAsCsv() method. And option(2) I couldn't understand where to compute the outlier. I would want to use the writeAsCsv() method but currently it doesn't perform the write operation and unable to understand why. An interesting thing I found is, when I run the outlierDetection class from eclipse a single file result gets written within the kmeans folder, whereas in case of default KMeans class it writes a result folder within the kmeans folder and the files with points are written inside the result folder. I give the necessary path in the arguments while running. Eg: file:///home/softwares/flink-0.10.0/kmeans/points file:///home/softwares/flink-0.10.0/kmeans/centers file:///home/softwares/flink-0.10.0/kmeans/result 10 Now, after I create the runnable jar file for KMeans and outlierDetection class, when I upload it to flink web submission client it works fine for KMeans.jar, the folder and files get created. But incase of outlierDetection.jar no file or folder get's written inside kmeans. How is it that outlier class is able to write file via eclipse but outlier jar not able to write via flink web submission client. Best Regards, Subash Basnet On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske <[hidden email]> wrote:
|
I would try to do the outlier compuation with the DataSet API instead of fetching the results to the client with collect(). If you do that, you can directly use writeAsCsv because the result is still a DataSet.2016-02-10 18:29 GMT+01:00 subash basnet <[hidden email]>:
|
Hello Fabian, Thank you for the response, but I have been stuck on how to iterate over the DataSet, perform operations and return a new modified DataSet similar to that of list operation as shown below. Eg: Currently I am doing the following: for (Centroid centroid : centroids.collect()) { for (Tuple2<Integer, Point> element : clusteredPoints.collect()) { //perform necessary operations } //add elements } //return elements list It would be really nice if I could just get started. I have been trying to add element to DataSet using join, but when I print the DataSet it contains only one initial element, it prints the same value as initial set value. for(....){ newElement = new Tuple3<Integer, Point, Boolean>(); dataSetElement.join(env.fromElements(newElement)); dataSetElement.print(); } Unsure if I am using right function or using join in a wrong manner. Best Regards, Subash Basnet On Wed, Feb 10, 2016 at 6:33 PM, Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |