How to convert List to flink DataSet

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

How to convert List to flink DataSet

subashbasnet
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet
Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

stefanobaghino
Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

subashbasnet
In reply to this post by subashbasnet
Hello Stefano,

Yeah the type casting worked, thank you. But not able to print the Dataset to the file. 

The default below code which writes the KMeans points along with their centroid numbers to the file works fine:
                // feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
              if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

But my modified code below to find outlier:
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
               DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
               if (fileOutput) {
fElements.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print();

Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case.



Best Regards,
Subash Basnet


On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit


Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

Fabian Hueske-2
Hi Subash,

how is findOutliers implemented?

It might be that you mix-up local and cluster computation. All DataSets are processed in the cluster. Please note the following:
- ExecutionEnvironment.fromCollection() transforms a client local connection into a DataSet by serializing it and sending it to the cluster.
- DataSet.collect() transforms a DataSet into a collection and ships it back to the client.

So, does findOutliers operate on the cluster or on the local client, i.e., does it work with DataSet and send the result back as a collection or does it first collect the results as collection and operate on these?

Best, Fabian

2016-02-10 12:13 GMT+01:00 subash basnet <[hidden email]>:
Hello Stefano,

Yeah the type casting worked, thank you. But not able to print the Dataset to the file. 

The default below code which writes the KMeans points along with their centroid numbers to the file works fine:
                // feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
              if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

But my modified code below to find outlier:
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
               DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
               if (fileOutput) {
fElements.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print();

Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case.



Best Regards,
Subash Basnet


On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

subashbasnet
In reply to this post by subashbasnet
Hello Fabian,

I use the collect() method to get the elements locally and perform operations on that and return the result as a collection. The collection result is converted to the DataSet in the calling method. 
Below is the code of findOutliers method:

public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> clusteredPoints,
DataSet<Centroid> centroids) throws Exception {
List<Tuple3> finalElements = new ArrayList<Tuple3>();
List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();
List<Centroid> centroidList = centroids.collect();
List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> elementsWithDistance = new ArrayList<Tuple3<Centroid,                     Tuple2<Integer, Point>, Double>>();
for (Centroid centroid : centroidList) {
elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>();
double totalDistance = 0;
int elementsCount = 0;
for (Tuple2<Integer, Point> e : elements) {
// compute distance
if (e.f0 == centroid.id) {
Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new Tuple3<Centroid,                                                               Tuple2<Integer, Point>, Double>();
double distance = e.f1.euclideanDistance(centroid);
totalDistance += distance;
newElement.setFields(centroid, e, distance);
elementsWithDistance.add(newElement);
elementsCount++;
}
}
// finding mean
double mean = totalDistance / elementsCount;
double sdTotalDistanceSquare = 0;
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2);
sdTotalDistanceSquare += distanceSquare;
}
double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount);
double upperlimit = mean + 2 * sd;
double lowerlimit = mean - 2 * sd;
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();// true
// =
// outlier
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
newElement = new Tuple3<Integer, Point, Boolean>();
if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > upperlimit) {
// set as outlier
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, true);
} else {
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, false);
}
finalElements.add(newElement);
}
}
return finalElements;
}

I have attached herewith the screenshot of my project structure and KMeansOutlierDetection.java file for more clarity. 


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

how is findOutliers implemented?

It might be that you mix-up local and cluster computation. All DataSets are processed in the cluster. Please note the following:
- ExecutionEnvironment.fromCollection() transforms a client local connection into a DataSet by serializing it and sending it to the cluster.
- DataSet.collect() transforms a DataSet into a collection and ships it back to the client.

So, does findOutliers operate on the cluster or on the local client, i.e., does it work with DataSet and send the result back as a collection or does it first collect the results as collection and operate on these?

Best, Fabian

2016-02-10 12:13 GMT+01:00 subash basnet <[hidden email]>:
Hello Stefano,

Yeah the type casting worked, thank you. But not able to print the Dataset to the file. 

The default below code which writes the KMeans points along with their centroid numbers to the file works fine:
                // feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
              if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

But my modified code below to find outlier:
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
               DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
               if (fileOutput) {
fElements.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print();

Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case.



Best Regards,
Subash Basnet


On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit






project_structure.png (491K) Download Attachment
KMeansOutlierDetection.java (19K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

Fabian Hueske-2
Hi Subash,

I would not fetch the data to the client, do the computation there, and send it back, just for the purpose of writing it to a file.

Either 1) pull the results to the client and write the file from there or 2) compute the outliers in the cluster.
I did not study your code completely, but the two nested loops and the condition are a join for example.

I would go for option 2, if possible.

Best, Fabian


2016-02-10 13:07 GMT+01:00 subash basnet <[hidden email]>:
Hello Fabian,

I use the collect() method to get the elements locally and perform operations on that and return the result as a collection. The collection result is converted to the DataSet in the calling method. 
Below is the code of findOutliers method:

public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> clusteredPoints,
DataSet<Centroid> centroids) throws Exception {
List<Tuple3> finalElements = new ArrayList<Tuple3>();
List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();
List<Centroid> centroidList = centroids.collect();
List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> elementsWithDistance = new ArrayList<Tuple3<Centroid,                     Tuple2<Integer, Point>, Double>>();
for (Centroid centroid : centroidList) {
elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>();
double totalDistance = 0;
int elementsCount = 0;
for (Tuple2<Integer, Point> e : elements) {
// compute distance
if (e.f0 == centroid.id) {
Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new Tuple3<Centroid,                                                               Tuple2<Integer, Point>, Double>();
double distance = e.f1.euclideanDistance(centroid);
totalDistance += distance;
newElement.setFields(centroid, e, distance);
elementsWithDistance.add(newElement);
elementsCount++;
}
}
// finding mean
double mean = totalDistance / elementsCount;
double sdTotalDistanceSquare = 0;
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2);
sdTotalDistanceSquare += distanceSquare;
}
double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount);
double upperlimit = mean + 2 * sd;
double lowerlimit = mean - 2 * sd;
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();// true
// =
// outlier
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
newElement = new Tuple3<Integer, Point, Boolean>();
if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > upperlimit) {
// set as outlier
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, true);
} else {
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, false);
}
finalElements.add(newElement);
}
}
return finalElements;
}

I have attached herewith the screenshot of my project structure and KMeansOutlierDetection.java file for more clarity. 


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

how is findOutliers implemented?

It might be that you mix-up local and cluster computation. All DataSets are processed in the cluster. Please note the following:
- ExecutionEnvironment.fromCollection() transforms a client local connection into a DataSet by serializing it and sending it to the cluster.
- DataSet.collect() transforms a DataSet into a collection and ships it back to the client.

So, does findOutliers operate on the cluster or on the local client, i.e., does it work with DataSet and send the result back as a collection or does it first collect the results as collection and operate on these?

Best, Fabian

2016-02-10 12:13 GMT+01:00 subash basnet <[hidden email]>:
Hello Stefano,

Yeah the type casting worked, thank you. But not able to print the Dataset to the file. 

The default below code which writes the KMeans points along with their centroid numbers to the file works fine:
                // feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
              if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

But my modified code below to find outlier:
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
               DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
               if (fileOutput) {
fElements.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print();

Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case.



Best Regards,
Subash Basnet


On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit






Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

subashbasnet
Hello Fabian,

As written before code:
DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
fElements.writeAsCsv(outputPath, "\n", " ");
env.execute("KMeans Example");

I am very new to flink so not so clear about what you suggested, by option(1) you meant that I write my own FileWriter here rather than using writeAsCsv() method. And option(2) I couldn't understand where to compute the outlier. I would want to use the writeAsCsv() method but currently it doesn't perform the write operation and unable to understand why.

An interesting thing I found is, when I run the outlierDetection class from eclipse a single file result gets written within the kmeans folder, whereas in case of default KMeans class it writes a result folder within the kmeans folder and the files with points are written inside the result folder. 
I give the necessary path in the arguments while running.
Eg: file:///home/softwares/flink-0.10.0/kmeans/points file:///home/softwares/flink-0.10.0/kmeans/centers file:///home/softwares/flink-0.10.0/kmeans/result 10

Now, after I create the runnable jar file for KMeans and outlierDetection class,  when I upload it to flink web submission client it works fine for KMeans.jar, the folder and files get created. But incase of outlierDetection.jar no file or folder get's written inside kmeans. 

How is it that outlier class is able to write file via eclipse but outlier jar not able to write via flink web submission client.  


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske <[hidden email]> wrote:
Hi Subash,

I would not fetch the data to the client, do the computation there, and send it back, just for the purpose of writing it to a file.

Either 1) pull the results to the client and write the file from there or 2) compute the outliers in the cluster.
I did not study your code completely, but the two nested loops and the condition are a join for example.

I would go for option 2, if possible.

Best, Fabian


2016-02-10 13:07 GMT+01:00 subash basnet <[hidden email]>:
Hello Fabian,

I use the collect() method to get the elements locally and perform operations on that and return the result as a collection. The collection result is converted to the DataSet in the calling method. 
Below is the code of findOutliers method:

public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> clusteredPoints,
DataSet<Centroid> centroids) throws Exception {
List<Tuple3> finalElements = new ArrayList<Tuple3>();
List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();
List<Centroid> centroidList = centroids.collect();
List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> elementsWithDistance = new ArrayList<Tuple3<Centroid,                     Tuple2<Integer, Point>, Double>>();
for (Centroid centroid : centroidList) {
elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>();
double totalDistance = 0;
int elementsCount = 0;
for (Tuple2<Integer, Point> e : elements) {
// compute distance
if (e.f0 == centroid.id) {
Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new Tuple3<Centroid,                                                               Tuple2<Integer, Point>, Double>();
double distance = e.f1.euclideanDistance(centroid);
totalDistance += distance;
newElement.setFields(centroid, e, distance);
elementsWithDistance.add(newElement);
elementsCount++;
}
}
// finding mean
double mean = totalDistance / elementsCount;
double sdTotalDistanceSquare = 0;
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2);
sdTotalDistanceSquare += distanceSquare;
}
double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount);
double upperlimit = mean + 2 * sd;
double lowerlimit = mean - 2 * sd;
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();// true
// =
// outlier
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
newElement = new Tuple3<Integer, Point, Boolean>();
if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > upperlimit) {
// set as outlier
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, true);
} else {
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, false);
}
finalElements.add(newElement);
}
}
return finalElements;
}

I have attached herewith the screenshot of my project structure and KMeansOutlierDetection.java file for more clarity. 


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

how is findOutliers implemented?

It might be that you mix-up local and cluster computation. All DataSets are processed in the cluster. Please note the following:
- ExecutionEnvironment.fromCollection() transforms a client local connection into a DataSet by serializing it and sending it to the cluster.
- DataSet.collect() transforms a DataSet into a collection and ships it back to the client.

So, does findOutliers operate on the cluster or on the local client, i.e., does it work with DataSet and send the result back as a collection or does it first collect the results as collection and operate on these?

Best, Fabian

2016-02-10 12:13 GMT+01:00 subash basnet <[hidden email]>:
Hello Stefano,

Yeah the type casting worked, thank you. But not able to print the Dataset to the file. 

The default below code which writes the KMeans points along with their centroid numbers to the file works fine:
                // feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
              if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

But my modified code below to find outlier:
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
               DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
               if (fileOutput) {
fElements.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print();

Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case.



Best Regards,
Subash Basnet


On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit







Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

Fabian Hueske-2
I would try to do the outlier compuation with the DataSet API instead of fetching the results to the client with collect().
If you do that, you can directly use writeAsCsv because the result is still a DataSet.

What you have to do, is to translate your findOutliers method into DataSet API code.

Best, Fabian

2016-02-10 18:29 GMT+01:00 subash basnet <[hidden email]>:
Hello Fabian,

As written before code:
DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
fElements.writeAsCsv(outputPath, "\n", " ");
env.execute("KMeans Example");

I am very new to flink so not so clear about what you suggested, by option(1) you meant that I write my own FileWriter here rather than using writeAsCsv() method. And option(2) I couldn't understand where to compute the outlier. I would want to use the writeAsCsv() method but currently it doesn't perform the write operation and unable to understand why.

An interesting thing I found is, when I run the outlierDetection class from eclipse a single file result gets written within the kmeans folder, whereas in case of default KMeans class it writes a result folder within the kmeans folder and the files with points are written inside the result folder. 
I give the necessary path in the arguments while running.
Eg: file:///home/softwares/flink-0.10.0/kmeans/points file:///home/softwares/flink-0.10.0/kmeans/centers file:///home/softwares/flink-0.10.0/kmeans/result 10

Now, after I create the runnable jar file for KMeans and outlierDetection class,  when I upload it to flink web submission client it works fine for KMeans.jar, the folder and files get created. But incase of outlierDetection.jar no file or folder get's written inside kmeans. 

How is it that outlier class is able to write file via eclipse but outlier jar not able to write via flink web submission client.  


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske <[hidden email]> wrote:
Hi Subash,

I would not fetch the data to the client, do the computation there, and send it back, just for the purpose of writing it to a file.

Either 1) pull the results to the client and write the file from there or 2) compute the outliers in the cluster.
I did not study your code completely, but the two nested loops and the condition are a join for example.

I would go for option 2, if possible.

Best, Fabian


2016-02-10 13:07 GMT+01:00 subash basnet <[hidden email]>:
Hello Fabian,

I use the collect() method to get the elements locally and perform operations on that and return the result as a collection. The collection result is converted to the DataSet in the calling method. 
Below is the code of findOutliers method:

public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> clusteredPoints,
DataSet<Centroid> centroids) throws Exception {
List<Tuple3> finalElements = new ArrayList<Tuple3>();
List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();
List<Centroid> centroidList = centroids.collect();
List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> elementsWithDistance = new ArrayList<Tuple3<Centroid,                     Tuple2<Integer, Point>, Double>>();
for (Centroid centroid : centroidList) {
elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>();
double totalDistance = 0;
int elementsCount = 0;
for (Tuple2<Integer, Point> e : elements) {
// compute distance
if (e.f0 == centroid.id) {
Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new Tuple3<Centroid,                                                               Tuple2<Integer, Point>, Double>();
double distance = e.f1.euclideanDistance(centroid);
totalDistance += distance;
newElement.setFields(centroid, e, distance);
elementsWithDistance.add(newElement);
elementsCount++;
}
}
// finding mean
double mean = totalDistance / elementsCount;
double sdTotalDistanceSquare = 0;
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2);
sdTotalDistanceSquare += distanceSquare;
}
double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount);
double upperlimit = mean + 2 * sd;
double lowerlimit = mean - 2 * sd;
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();// true
// =
// outlier
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
newElement = new Tuple3<Integer, Point, Boolean>();
if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > upperlimit) {
// set as outlier
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, true);
} else {
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, false);
}
finalElements.add(newElement);
}
}
return finalElements;
}

I have attached herewith the screenshot of my project structure and KMeansOutlierDetection.java file for more clarity. 


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

how is findOutliers implemented?

It might be that you mix-up local and cluster computation. All DataSets are processed in the cluster. Please note the following:
- ExecutionEnvironment.fromCollection() transforms a client local connection into a DataSet by serializing it and sending it to the cluster.
- DataSet.collect() transforms a DataSet into a collection and ships it back to the client.

So, does findOutliers operate on the cluster or on the local client, i.e., does it work with DataSet and send the result back as a collection or does it first collect the results as collection and operate on these?

Best, Fabian

2016-02-10 12:13 GMT+01:00 subash basnet <[hidden email]>:
Hello Stefano,

Yeah the type casting worked, thank you. But not able to print the Dataset to the file. 

The default below code which writes the KMeans points along with their centroid numbers to the file works fine:
                // feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
              if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

But my modified code below to find outlier:
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
               DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
               if (fileOutput) {
fElements.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print();

Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case.



Best Regards,
Subash Basnet


On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit








Reply | Threaded
Open this post in threaded view
|

Re: How to convert List to flink DataSet

subashbasnet
Hello Fabian,

Thank you for the response, but I have been stuck on how to iterate over the DataSet, perform operations and return a new modified DataSet similar to that of list operation as shown below. 
Eg: Currently I am doing the following:
for (Centroid centroid : centroids.collect()) {
    for (Tuple2<Integer, Point> element : clusteredPoints.collect()) {
       //perform necessary operations
     }
//add elements
}
//return elements list

It would be really nice if I could just get started. 

I have been trying to add element to DataSet using join, but when I print the DataSet it contains only one initial element, it prints the same value as initial set value. 
for(....){
newElement = new Tuple3<Integer, Point, Boolean>();
dataSetElement.join(env.fromElements(newElement));
dataSetElement.print();
}

Unsure if I am using right function or using join in a wrong manner. 

Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 6:33 PM, Fabian Hueske <[hidden email]> wrote:
I would try to do the outlier compuation with the DataSet API instead of fetching the results to the client with collect().
If you do that, you can directly use writeAsCsv because the result is still a DataSet.

What you have to do, is to translate your findOutliers method into DataSet API code.

Best, Fabian

2016-02-10 18:29 GMT+01:00 subash basnet <[hidden email]>:
Hello Fabian,

As written before code:
DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
fElements.writeAsCsv(outputPath, "\n", " ");
env.execute("KMeans Example");

I am very new to flink so not so clear about what you suggested, by option(1) you meant that I write my own FileWriter here rather than using writeAsCsv() method. And option(2) I couldn't understand where to compute the outlier. I would want to use the writeAsCsv() method but currently it doesn't perform the write operation and unable to understand why.

An interesting thing I found is, when I run the outlierDetection class from eclipse a single file result gets written within the kmeans folder, whereas in case of default KMeans class it writes a result folder within the kmeans folder and the files with points are written inside the result folder. 
I give the necessary path in the arguments while running.
Eg: file:///home/softwares/flink-0.10.0/kmeans/points file:///home/softwares/flink-0.10.0/kmeans/centers file:///home/softwares/flink-0.10.0/kmeans/result 10

Now, after I create the runnable jar file for KMeans and outlierDetection class,  when I upload it to flink web submission client it works fine for KMeans.jar, the folder and files get created. But incase of outlierDetection.jar no file or folder get's written inside kmeans. 

How is it that outlier class is able to write file via eclipse but outlier jar not able to write via flink web submission client.  


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 1:58 PM, Fabian Hueske <[hidden email]> wrote:
Hi Subash,

I would not fetch the data to the client, do the computation there, and send it back, just for the purpose of writing it to a file.

Either 1) pull the results to the client and write the file from there or 2) compute the outliers in the cluster.
I did not study your code completely, but the two nested loops and the condition are a join for example.

I would go for option 2, if possible.

Best, Fabian


2016-02-10 13:07 GMT+01:00 subash basnet <[hidden email]>:
Hello Fabian,

I use the collect() method to get the elements locally and perform operations on that and return the result as a collection. The collection result is converted to the DataSet in the calling method. 
Below is the code of findOutliers method:

public static List<Tuple3> findOutliers(DataSet<Tuple2<Integer, Point>> clusteredPoints,
DataSet<Centroid> centroids) throws Exception {
List<Tuple3> finalElements = new ArrayList<Tuple3>();
List<Tuple2<Integer, Point>> elements = clusteredPoints.collect();
List<Centroid> centroidList = centroids.collect();
List<Tuple3<Centroid, Tuple2<Integer, Point>, Double>> elementsWithDistance = new ArrayList<Tuple3<Centroid,                     Tuple2<Integer, Point>, Double>>();
for (Centroid centroid : centroidList) {
elementsWithDistance = new ArrayList<Tuple3<Centroid, Tuple2<Integer, Point>, Double>>();
double totalDistance = 0;
int elementsCount = 0;
for (Tuple2<Integer, Point> e : elements) {
// compute distance
if (e.f0 == centroid.id) {
Tuple3<Centroid, Tuple2<Integer, Point>, Double> newElement = new Tuple3<Centroid,                                                               Tuple2<Integer, Point>, Double>();
double distance = e.f1.euclideanDistance(centroid);
totalDistance += distance;
newElement.setFields(centroid, e, distance);
elementsWithDistance.add(newElement);
elementsCount++;
}
}
// finding mean
double mean = totalDistance / elementsCount;
double sdTotalDistanceSquare = 0;
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
double distanceSquare = Math.pow(mean - elementWithDistance.f2, 2);
sdTotalDistanceSquare += distanceSquare;
}
double sd = Math.sqrt(sdTotalDistanceSquare / elementsCount);
double upperlimit = mean + 2 * sd;
double lowerlimit = mean - 2 * sd;
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();// true
// =
// outlier
for (Tuple3<Centroid, Tuple2<Integer, Point>, Double> elementWithDistance : elementsWithDistance) {
newElement = new Tuple3<Integer, Point, Boolean>();
if (elementWithDistance.f2 < lowerlimit || elementWithDistance.f2 > upperlimit) {
// set as outlier
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, true);
} else {
newElement.setFields(elementWithDistance.f1.f0, elementWithDistance.f1.f1, false);
}
finalElements.add(newElement);
}
}
return finalElements;
}

I have attached herewith the screenshot of my project structure and KMeansOutlierDetection.java file for more clarity. 


Best Regards,
Subash Basnet

On Wed, Feb 10, 2016 at 12:26 PM, Fabian Hueske <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Hi Subash,

how is findOutliers implemented?

It might be that you mix-up local and cluster computation. All DataSets are processed in the cluster. Please note the following:
- ExecutionEnvironment.fromCollection() transforms a client local connection into a DataSet by serializing it and sending it to the cluster.
- DataSet.collect() transforms a DataSet into a collection and ships it back to the client.

So, does findOutliers operate on the cluster or on the local client, i.e., does it work with DataSet and send the result back as a collection or does it first collect the results as collection and operate on these?

Best, Fabian

2016-02-10 12:13 GMT+01:00 subash basnet <[hidden email]>:
Hello Stefano,

Yeah the type casting worked, thank you. But not able to print the Dataset to the file. 

The default below code which writes the KMeans points along with their centroid numbers to the file works fine:
                // feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
              if (fileOutput) {
clusteredPoints.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

But my modified code below to find outlier:
// feed new centroids back into next iteration
DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
DataSet<Tuple2<Integer, Point>> clusteredPoints = points
// assign points to final clusters
.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
               DataSet<Tuple3> fElements = env.fromCollection(findOutliers(clusteredPoints, finalCentroids));
               if (fileOutput) {
fElements.writeAsCsv(outputPath, "\n", " ");
// since file sinks are lazy, we trigger the execution explicitly
env.execute("KMeans Example");
}

It's not writing to the file, the result folder does not get created inside kmeans folder where my centers, points file are located. I am only able to print it to the console via fElements.print();

Does it have something to do with env.exectue(""), which must be set somewhere in the previous case but not in my case.



Best Regards,
Subash Basnet


On Tue, Feb 9, 2016 at 6:29 PM, Stefano Baghino <[hidden email]> wrote:
Boxbe This message is eligible for Automatic Cleanup! ([hidden email]) Add cleanup rule | More info

Assuming your EnvironmentContext is named `env` Simply call:

DataSet<Tuple3<Integer, Point, Boolean>> fElements = env.fromCollection(finalElements);

Does this help?

On Tue, Feb 9, 2016 at 6:06 PM, subash basnet <[hidden email]> wrote:
Hello all,

I have performed a modification in KMeans code to detect outliers. I have printed the output in the console but I am not able to write it to the file using the given 'writeAsCsv' method. 
The problem is I generate a list of tuples. 
My List is:
List<Tuple3> finalElements = new ArrayList<Tuple3>();
Following is the datatype of the elements added to the list:
Tuple3<Integer, Point, Boolean> newElement = new Tuple3<Integer, Point, Boolean>();
finalElements.add(newElement);
Now I am stuck on how to convert this 'finalElements' to DataSet<Tuple3<Integer, Point, Boolean>> fElements, 
so that I could use 
fElements.writeAsCsv(outputPath, "\n"," ");

Best Regards,
Subash Basnet



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit