spark vs flink batch performance

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

spark vs flink batch performance

capacman
Hi all,

I am trying to compare spark and flink batch performance. In my test i am using ratings.csv in http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also concatenated ratings.csv 16 times to increase dataset size(total of 390465536 records almost 10gb).I am reading from google storage with gcs-connector and  file schema is : userId,movieId,rating,timestamp. Basically i am calculating average rating per movie

Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
case class Rating(userID: String, movieID: String, rating: Double, date: Timestamp)
 
def parseRating(line: String): Rating = {
  val arr = line.split(",")
  Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * 1000)))
}
 
val ratings: DataSet[Rating] = env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a => parseRating(a))
ratings
  .map(i => (i.movieID, 1, i.rating))
  .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3), CombineHint.HASH)
  .map(i => (i._1, i._3 / i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)

with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s

Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
case class Rating(userID: String, movieID: String, rating: Double, date: Timestamp)
def parseRating(line: String): Rating = {
  val arr = line.split(",")
  Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * 1000)))
}
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val keyed: RDD[(String, (Int, Double))] = sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r => (r.movieID, (1, r.rating)))
keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i => i._2 / i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)

with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1 minute(almost 3m6s) 

Machine config on google cloud:
taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
java version:jdk jdk-8u102
flink:1.1.3
spark:2.0.2

I also attached flink-conf.yaml. Although it is not such a big difference there is a 40% performance difference between spark and flink. Is there something i am doing wrong? If there is not how can i fine tune flink or is it normal spark has better performance with batch data?

Thank you in advance...

flink-conf.yaml (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: spark vs flink batch performance

capacman

Hi all,

In the mean time i have three workers. Any thoughts about improving flink performance?

Thank you...


On Nov 17, 2016 00:38, "CPC" <[hidden email]> wrote:
Hi all,

I am trying to compare spark and flink batch performance. In my test i am using ratings.csv in http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also concatenated ratings.csv 16 times to increase dataset size(total of 390465536 records almost 10gb).I am reading from google storage with gcs-connector and  file schema is : userId,movieId,rating,timestamp. Basically i am calculating average rating per movie

Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
case class Rating(userID: String, movieID: String, rating: Double, date: Timestamp)
 
def parseRating(line: String): Rating = {
  val arr = line.split(",")
  Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * 1000)))
}
 
val ratings: DataSet[Rating] = env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a => parseRating(a))
ratings
  .map(i => (i.movieID, 1, i.rating))
  .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3), CombineHint.HASH)
  .map(i => (i._1, i._3 / i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)

with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s

Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
case class Rating(userID: String, movieID: String, rating: Double, date: Timestamp)
def parseRating(line: String): Rating = {
  val arr = line.split(",")
  Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong * 1000)))
}
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val keyed: RDD[(String, (Int, Double))] = sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r => (r.movieID, (1, r.rating)))
keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i => i._2 / i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)

with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1 minute(almost 3m6s) 

Machine config on google cloud:
taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
java version:jdk jdk-8u102
flink:1.1.3
spark:2.0.2

I also attached flink-conf.yaml. Although it is not such a big difference there is a 40% performance difference between spark and flink. Is there something i am doing wrong? If there is not how can i fine tune flink or is it normal spark has better performance with batch data?

Thank you in advance...
Reply | Threaded
Open this post in threaded view
|

Re: spark vs flink batch performance

Gábor Gévay
In reply to this post by capacman
Hello,

Your program looks mostly fine, but there are a few minor things that
might help a bit:

Parallelism: In your attached flink-conf.yaml, you have 2 task slots
per task manager, and if you have 1 task manager, then your total
number of task slots is also 2. However, your default parallelism is
6. In Flink, the recommended default parallelism is exactly the total
number of task slots [1]. (This is in contrast to Spark, where the
recommended setting is 2-3 per CPU core [2].)

CSV reading: If your input is a CSV file, then you should use
readCsvFile (instead of readTextFile and then parsing it manually).

Collect call: How large is the DataSet that you are using collect on?
If it is large, then we might try to figure out a way to get the top
10 elements without first collecting the DataSet.

Best,
Gábor

[1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
[2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism





2016-11-16 22:38 GMT+01:00 CPC <[hidden email]>:

> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in
> http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also
> concatenated ratings.csv 16 times to increase dataset size(total of
> 390465536 records almost 10gb).I am reading from google storage with
> gcs-connector and  file schema is : userId,movieId,rating,timestamp.
> Basically i am calculating average rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>
>
>>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>>
>> val ratings: DataSet[Rating] =
>> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>> parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 /
>> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] =
>> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>> => (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 /
>> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...
Reply | Threaded
Open this post in threaded view
|

Re: spark vs flink batch performance

capacman
Hi Gabor,

Thank you for your kind response. I forget to mention that i have actually three workers. This is why i set default paralelism to 6.

For csv reading, i deliberately did not use csv reader since i want to run same code across spark and flink. Collect is returning 40k records which is not so big. 

I will try same test with spark 1.5 and 1.6 as well to understand whether spark 2.x series has some performance improvements because in those kind of tests, spark and flink was either on par or flink 10-15% faster than spark in the past. Aside from that are any configuration parameters you may propose to fine tune flink?

Best,
Anıl

On Nov 18, 2016 12:25, "Gábor Gévay" <[hidden email]> wrote:
Hello,

Your program looks mostly fine, but there are a few minor things that
might help a bit:

Parallelism: In your attached flink-conf.yaml, you have 2 task slots
per task manager, and if you have 1 task manager, then your total
number of task slots is also 2. However, your default parallelism is
6. In Flink, the recommended default parallelism is exactly the total
number of task slots [1]. (This is in contrast to Spark, where the
recommended setting is 2-3 per CPU core [2].)

CSV reading: If your input is a CSV file, then you should use
readCsvFile (instead of readTextFile and then parsing it manually).

Collect call: How large is the DataSet that you are using collect on?
If it is large, then we might try to figure out a way to get the top
10 elements without first collecting the DataSet.

Best,
Gábor

[1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
[2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism





2016-11-16 22:38 GMT+01:00 CPC <[hidden email]>:
> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in
> http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also
> concatenated ratings.csv 16 times to increase dataset size(total of
> 390465536 records almost 10gb).I am reading from google storage with
> gcs-connector and  file schema is : userId,movieId,rating,timestamp.
> Basically i am calculating average rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>
>
>>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>>
>> val ratings: DataSet[Rating] =
>> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>> parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 /
>> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] =
>> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>> => (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 /
>> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...
Reply | Threaded
Open this post in threaded view
|

Re: spark vs flink batch performance

Flavio Pompermaier
I think this could be very helpful for your study:


Best,
Flavio

On Fri, Nov 18, 2016 at 11:37 AM, CPC <[hidden email]> wrote:
Hi Gabor,

Thank you for your kind response. I forget to mention that i have actually three workers. This is why i set default paralelism to 6.

For csv reading, i deliberately did not use csv reader since i want to run same code across spark and flink. Collect is returning 40k records which is not so big. 

I will try same test with spark 1.5 and 1.6 as well to understand whether spark 2.x series has some performance improvements because in those kind of tests, spark and flink was either on par or flink 10-15% faster than spark in the past. Aside from that are any configuration parameters you may propose to fine tune flink?

Best,
Anıl

On Nov 18, 2016 12:25, "Gábor Gévay" <[hidden email]> wrote:
Hello,

Your program looks mostly fine, but there are a few minor things that
might help a bit:

Parallelism: In your attached flink-conf.yaml, you have 2 task slots
per task manager, and if you have 1 task manager, then your total
number of task slots is also 2. However, your default parallelism is
6. In Flink, the recommended default parallelism is exactly the total
number of task slots [1]. (This is in contrast to Spark, where the
recommended setting is 2-3 per CPU core [2].)

CSV reading: If your input is a CSV file, then you should use
readCsvFile (instead of readTextFile and then parsing it manually).

Collect call: How large is the DataSet that you are using collect on?
If it is large, then we might try to figure out a way to get the top
10 elements without first collecting the DataSet.

Best,
Gábor

[1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
[2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism





2016-11-16 22:38 GMT+01:00 CPC <[hidden email]>:
> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in
> http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also
> concatenated ratings.csv 16 times to increase dataset size(total of
> 390465536 records almost 10gb).I am reading from google storage with
> gcs-connector and  file schema is : userId,movieId,rating,timestamp.
> Basically i am calculating average rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>
>
>>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>>
>> val ratings: DataSet[Rating] =
>> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>> parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 /
>> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] =
>> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>> => (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 /
>> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...

Reply | Threaded
Open this post in threaded view
|

Re: spark vs flink batch performance

capacman
Thank you Flavio. I will generate flamegraph for flink and compare them.

On 18 November 2016 at 13:43, Flavio Pompermaier <[hidden email]> wrote:
I think this could be very helpful for your study:


Best,
Flavio

On Fri, Nov 18, 2016 at 11:37 AM, CPC <[hidden email]> wrote:
Hi Gabor,

Thank you for your kind response. I forget to mention that i have actually three workers. This is why i set default paralelism to 6.

For csv reading, i deliberately did not use csv reader since i want to run same code across spark and flink. Collect is returning 40k records which is not so big. 

I will try same test with spark 1.5 and 1.6 as well to understand whether spark 2.x series has some performance improvements because in those kind of tests, spark and flink was either on par or flink 10-15% faster than spark in the past. Aside from that are any configuration parameters you may propose to fine tune flink?

Best,
Anıl

On Nov 18, 2016 12:25, "Gábor Gévay" <[hidden email]> wrote:
Hello,

Your program looks mostly fine, but there are a few minor things that
might help a bit:

Parallelism: In your attached flink-conf.yaml, you have 2 task slots
per task manager, and if you have 1 task manager, then your total
number of task slots is also 2. However, your default parallelism is
6. In Flink, the recommended default parallelism is exactly the total
number of task slots [1]. (This is in contrast to Spark, where the
recommended setting is 2-3 per CPU core [2].)

CSV reading: If your input is a CSV file, then you should use
readCsvFile (instead of readTextFile and then parsing it manually).

Collect call: How large is the DataSet that you are using collect on?
If it is large, then we might try to figure out a way to get the top
10 elements without first collecting the DataSet.

Best,
Gábor

[1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
[2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism





2016-11-16 22:38 GMT+01:00 CPC <[hidden email]>:
> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in
> http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also
> concatenated ratings.csv 16 times to increase dataset size(total of
> 390465536 records almost 10gb).I am reading from google storage with
> gcs-connector and  file schema is : userId,movieId,rating,timestamp.
> Basically i am calculating average rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>
>
>>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>>
>> val ratings: DataSet[Rating] =
>> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>> parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 /
>> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] =
>> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>> => (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 /
>> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...


Reply | Threaded
Open this post in threaded view
|

Re: spark vs flink batch performance

Greg Hogan
In reply to this post by capacman
"For csv reading, i deliberately did not use csv reader since i want to run same code across spark and flink."

If your objective deviates from writing and running the fastest Spark and fastest Flink programs, then your comparison is worthless.


On Fri, Nov 18, 2016 at 5:37 AM, CPC <[hidden email]> wrote:
Hi Gabor,

Thank you for your kind response. I forget to mention that i have actually three workers. This is why i set default paralelism to 6.

For csv reading, i deliberately did not use csv reader since i want to run same code across spark and flink. Collect is returning 40k records which is not so big. 

I will try same test with spark 1.5 and 1.6 as well to understand whether spark 2.x series has some performance improvements because in those kind of tests, spark and flink was either on par or flink 10-15% faster than spark in the past. Aside from that are any configuration parameters you may propose to fine tune flink?

Best,
Anıl

On Nov 18, 2016 12:25, "Gábor Gévay" <[hidden email]> wrote:
Hello,

Your program looks mostly fine, but there are a few minor things that
might help a bit:

Parallelism: In your attached flink-conf.yaml, you have 2 task slots
per task manager, and if you have 1 task manager, then your total
number of task slots is also 2. However, your default parallelism is
6. In Flink, the recommended default parallelism is exactly the total
number of task slots [1]. (This is in contrast to Spark, where the
recommended setting is 2-3 per CPU core [2].)

CSV reading: If your input is a CSV file, then you should use
readCsvFile (instead of readTextFile and then parsing it manually).

Collect call: How large is the DataSet that you are using collect on?
If it is large, then we might try to figure out a way to get the top
10 elements without first collecting the DataSet.

Best,
Gábor

[1] https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
[2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism





2016-11-16 22:38 GMT+01:00 CPC <[hidden email]>:
> Hi all,
>
> I am trying to compare spark and flink batch performance. In my test i am
> using ratings.csv in
> http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I also
> concatenated ratings.csv 16 times to increase dataset size(total of
> 390465536 records almost 10gb).I am reading from google storage with
> gcs-connector and  file schema is : userId,movieId,rating,timestamp.
> Basically i am calculating average rating per movie
>
> Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>
>
>>
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>
>
>>
>> val ratings: DataSet[Rating] =
>> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>> parseRating(a))
>> ratings
>>   .map(i => (i.movieID, 1, i.rating))
>>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>> CombineHint.HASH)
>>   .map(i => (i._1, i._3 /
>> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>
>
> with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>
> Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>
>> case class Rating(userID: String, movieID: String, rating: Double, date:
>> Timestamp)
>> def parseRating(line: String): Rating = {
>>   val arr = line.split(",")
>>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong *
>> 1000)))
>> }
>> val conf = new SparkConf().setAppName("Simple Application")
>> val sc = new SparkContext(conf)
>> val keyed: RDD[(String, (Int, Double))] =
>> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>> => (r.movieID, (1, r.rating)))
>> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>> i._2 /
>> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>
>
> with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
> minute(almost 3m6s)
>
> Machine config on google cloud:
> taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
> jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
> java version:jdk jdk-8u102
> flink:1.1.3
> spark:2.0.2
>
> I also attached flink-conf.yaml. Although it is not such a big difference
> there is a 40% performance difference between spark and flink. Is there
> something i am doing wrong? If there is not how can i fine tune flink or is
> it normal spark has better performance with batch data?
>
> Thank you in advance...

Reply | Threaded
Open this post in threaded view
|

Re: spark vs flink batch performance

Gábor Gévay
> "For csv reading, i deliberately did not use csv reader since i want to run
> same code across spark and flink."
>
> If your objective deviates from writing and running the fastest Spark and
> fastest Flink programs, then your comparison is worthless.

Well, I don't really agree with this. I would say that it can actually
be a valid objective to compare different systems in a way that we
don't tune the code very much to the individual systems. This is
because I guess it also happens sometimes in real (non-benchmark)
jobs, that we don't want to spend too much time on tuning.

However, in this case I also think that using the built-in CSV reader
method would not constitute as "too much tuning to a specific system".
So I would do this comparison with using the built-in CSV reader in
both systems.

Best,
Gábor



2016-11-18 15:30 GMT+01:00 Greg Hogan <[hidden email]>:

> "For csv reading, i deliberately did not use csv reader since i want to run
> same code across spark and flink."
>
> If your objective deviates from writing and running the fastest Spark and
> fastest Flink programs, then your comparison is worthless.
>
>
>
> On Fri, Nov 18, 2016 at 5:37 AM, CPC <[hidden email]> wrote:
>>
>> Hi Gabor,
>>
>> Thank you for your kind response. I forget to mention that i have actually
>> three workers. This is why i set default paralelism to 6.
>>
>> For csv reading, i deliberately did not use csv reader since i want to run
>> same code across spark and flink. Collect is returning 40k records which is
>> not so big.
>>
>> I will try same test with spark 1.5 and 1.6 as well to understand whether
>> spark 2.x series has some performance improvements because in those kind of
>> tests, spark and flink was either on par or flink 10-15% faster than spark
>> in the past. Aside from that are any configuration parameters you may
>> propose to fine tune flink?
>>
>> Best,
>> Anıl
>>
>> On Nov 18, 2016 12:25, "Gábor Gévay" <[hidden email]> wrote:
>>>
>>> Hello,
>>>
>>> Your program looks mostly fine, but there are a few minor things that
>>> might help a bit:
>>>
>>> Parallelism: In your attached flink-conf.yaml, you have 2 task slots
>>> per task manager, and if you have 1 task manager, then your total
>>> number of task slots is also 2. However, your default parallelism is
>>> 6. In Flink, the recommended default parallelism is exactly the total
>>> number of task slots [1]. (This is in contrast to Spark, where the
>>> recommended setting is 2-3 per CPU core [2].)
>>>
>>> CSV reading: If your input is a CSV file, then you should use
>>> readCsvFile (instead of readTextFile and then parsing it manually).
>>>
>>> Collect call: How large is the DataSet that you are using collect on?
>>> If it is large, then we might try to figure out a way to get the top
>>> 10 elements without first collecting the DataSet.
>>>
>>> Best,
>>> Gábor
>>>
>>> [1]
>>> https://flink.apache.org/faq.html#what-is-the-parallelism-how-do-i-set-it
>>> [2] https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism
>>>
>>>
>>>
>>>
>>>
>>> 2016-11-16 22:38 GMT+01:00 CPC <[hidden email]>:
>>> > Hi all,
>>> >
>>> > I am trying to compare spark and flink batch performance. In my test i
>>> > am
>>> > using ratings.csv in
>>> > http://files.grouplens.org/datasets/movielens/ml-latest.zip dataset. I
>>> > also
>>> > concatenated ratings.csv 16 times to increase dataset size(total of
>>> > 390465536 records almost 10gb).I am reading from google storage with
>>> > gcs-connector and  file schema is : userId,movieId,rating,timestamp.
>>> > Basically i am calculating average rating per movie
>>> >
>>> > Code for flink(i tested CombineHint.HASH and CombineHint.SORT)
>>> >>
>>> >> case class Rating(userID: String, movieID: String, rating: Double,
>>> >> date:
>>> >> Timestamp)
>>> >
>>> >
>>> >>
>>> >> def parseRating(line: String): Rating = {
>>> >>   val arr = line.split(",")
>>> >>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong
>>> >> *
>>> >> 1000)))
>>> >> }
>>> >
>>> >
>>> >>
>>> >> val ratings: DataSet[Rating] =
>>> >>
>>> >> env.readTextFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(a =>
>>> >> parseRating(a))
>>> >> ratings
>>> >>   .map(i => (i.movieID, 1, i.rating))
>>> >>   .groupBy(0).reduce((l, r) => (l._1, l._2 + r._2, l._3 + r._3),
>>> >> CombineHint.HASH)
>>> >>   .map(i => (i._1, i._3 /
>>> >>
>>> >> i._2)).collect().sortBy(_._1).sortBy(_._2)(Ordering.Double.reverse).take(10)
>>> >
>>> >
>>> > with CombineHint.HASH 3m49s and with CombineHint.SORT 5m9s
>>> >
>>> > Code for Spark(i tested reduceByKey and reduceByKeyLocaly)
>>> >>
>>> >> case class Rating(userID: String, movieID: String, rating: Double,
>>> >> date:
>>> >> Timestamp)
>>> >> def parseRating(line: String): Rating = {
>>> >>   val arr = line.split(",")
>>> >>   Rating(arr(0), arr(1), arr(2).toDouble, new Timestamp((arr(3).toLong
>>> >> *
>>> >> 1000)))
>>> >> }
>>> >> val conf = new SparkConf().setAppName("Simple Application")
>>> >> val sc = new SparkContext(conf)
>>> >> val keyed: RDD[(String, (Int, Double))] =
>>> >>
>>> >> sc.textFile("gs://cpcflink/wikistream/ratingsheadless16x.csv").map(parseRating).map(r
>>> >> => (r.movieID, (1, r.rating)))
>>> >> keyed.reduceByKey((l, r) => (l._1 + r._1, l._2 + r._2)).mapValues(i =>
>>> >> i._2 /
>>> >>
>>> >> i._1).collect.sortBy(_._1).sortBy(a=>a._2)(Ordering.Double.reverse).take(10).foreach(println)
>>> >
>>> >
>>> > with reduceByKeyLocaly 2.9 minute(almost 2m54s) and reduceByKey 3.1
>>> > minute(almost 3m6s)
>>> >
>>> > Machine config on google cloud:
>>> > taskmanager/sparkmaster: n1-standard-1 (1 vCPU, 3.75 GB memory)
>>> > jobmanager/sparkworkers: n1-standard-2 (2 vCPUs, 7.5 GB memory)
>>> > java version:jdk jdk-8u102
>>> > flink:1.1.3
>>> > spark:2.0.2
>>> >
>>> > I also attached flink-conf.yaml. Although it is not such a big
>>> > difference
>>> > there is a 40% performance difference between spark and flink. Is there
>>> > something i am doing wrong? If there is not how can i fine tune flink
>>> > or is
>>> > it normal spark has better performance with batch data?
>>> >
>>> > Thank you in advance...
>
>