Best way to compute the difference between 2 datasets

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Best way to compute the difference between 2 datasets

Juan Rodríguez Hortalá
Hi,

I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following:

private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))

val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
.sortPartition[(T, Boolean)](identity, Order.ASCENDING)
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
var latestOtherOpt: Option[T] = None
partitionIter.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment.

private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
val sortedPartition = {
val partition = partitionIter.toArray
util.Sorting.quickSort(partition)
partition
}
var latestOtherOpt: Option[T] = None
sortedPartition.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

I'm surprised by such a big difference. This is my code, and a test I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink?

Thanks,

Juan


Reply | Threaded
Open this post in threaded view
|

Re: Best way to compute the difference between 2 datasets

Ken Krugler
Hi Juan,

If you want to deduplicate, then you could group by the record, and use a (very simple) reduce function to only emit a record if the group contains one element.

There will be performance issues, though - Flink will have to generate all groups first, which typically means spilling to disk if the data set has any significant size.

— Ken

PS - I assume that you’ve implemented a valid hashCode()/equals() for the record.


On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <[hidden email]> wrote:

Hi,

I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following:

private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))

val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
.sortPartition[(T, Boolean)](identity, Order.ASCENDING)
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
var latestOtherOpt: Option[T] = None
partitionIter.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment.

private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
val sortedPartition = {
val partition = partitionIter.toArray
util.Sorting.quickSort(partition)
partition
}
var latestOtherOpt: Option[T] = None
sortedPartition.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

I'm surprised by such a big difference. This is my code, and a test I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink?

Thanks,

Juan



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Best way to compute the difference between 2 datasets

Juan Rodríguez Hortalá
Hi Ken,

Thanks for the suggestion, that idea should also work for implementing a data set difference operation, which is what concerns me here. However, I was also curious about why there is so much performance difference between using sortPartition and sorting in memory by partition, for datasets as small as 20 elements and running in local mode. For that data set sizes I would expect no relevant performance difference, but with sortPartition the program crashes, so I must be doing something wrong here.

Thanks in any case for the idea.

Greetings,

Juan

On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <[hidden email]> wrote:
Hi Juan,

If you want to deduplicate, then you could group by the record, and use a (very simple) reduce function to only emit a record if the group contains one element.

There will be performance issues, though - Flink will have to generate all groups first, which typically means spilling to disk if the data set has any significant size.

— Ken

PS - I assume that you’ve implemented a valid hashCode()/equals() for the record.


On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <[hidden email]> wrote:

Hi,

I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following:

private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))

val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
.sortPartition[(T, Boolean)](identity, Order.ASCENDING)
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
var latestOtherOpt: Option[T] = None
partitionIter.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment.

private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
val sortedPartition = {
val partition = partitionIter.toArray
util.Sorting.quickSort(partition)
partition
}
var latestOtherOpt: Option[T] = None
sortedPartition.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

I'm surprised by such a big difference. This is my code, and a test I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink?

Thanks,

Juan



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Best way to compute the difference between 2 datasets

Fabian Hueske-2
Hi Juan,

Both, the local execution environment and the remote execution environment run the same code to execute the program.
The implementation of the sortPartition operator was designed to scale to data sizes that exceed the memory.
Internally, it serializes all records into byte arrays and sorts the serialized data. This is of course more expensive than keeping all objects on the heap and sorting them there.
Hence, a certain performance difference is to be expected. However, something that should not happen is that the program fails.

What's the magnitude of the performance difference?
Can you post a stack trace of the error?

Thanks,
Fabian

Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <[hidden email]>:
Hi Ken,

Thanks for the suggestion, that idea should also work for implementing a data set difference operation, which is what concerns me here. However, I was also curious about why there is so much performance difference between using sortPartition and sorting in memory by partition, for datasets as small as 20 elements and running in local mode. For that data set sizes I would expect no relevant performance difference, but with sortPartition the program crashes, so I must be doing something wrong here.

Thanks in any case for the idea.

Greetings,

Juan

On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <[hidden email]> wrote:
Hi Juan,

If you want to deduplicate, then you could group by the record, and use a (very simple) reduce function to only emit a record if the group contains one element.

There will be performance issues, though - Flink will have to generate all groups first, which typically means spilling to disk if the data set has any significant size.

— Ken

PS - I assume that you’ve implemented a valid hashCode()/equals() for the record.


On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <[hidden email]> wrote:

Hi,

I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following:

private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))

val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
.sortPartition[(T, Boolean)](identity, Order.ASCENDING)
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
var latestOtherOpt: Option[T] = None
partitionIter.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment.

private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
val sortedPartition = {
val partition = partitionIter.toArray
util.Sorting.quickSort(partition)
partition
}
var latestOtherOpt: Option[T] = None
sortedPartition.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

I'm surprised by such a big difference. This is my code, and a test I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink?

Thanks,

Juan



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply | Threaded
Open this post in threaded view
|

Re: Best way to compute the difference between 2 datasets

Fabian Hueske-2
Btw. there is a set difference or minus operator in the Table API [1] that might be helpful.


Am Fr., 20. Sept. 2019 um 15:30 Uhr schrieb Fabian Hueske <[hidden email]>:
Hi Juan,

Both, the local execution environment and the remote execution environment run the same code to execute the program.
The implementation of the sortPartition operator was designed to scale to data sizes that exceed the memory.
Internally, it serializes all records into byte arrays and sorts the serialized data. This is of course more expensive than keeping all objects on the heap and sorting them there.
Hence, a certain performance difference is to be expected. However, something that should not happen is that the program fails.

What's the magnitude of the performance difference?
Can you post a stack trace of the error?

Thanks,
Fabian

Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <[hidden email]>:
Hi Ken,

Thanks for the suggestion, that idea should also work for implementing a data set difference operation, which is what concerns me here. However, I was also curious about why there is so much performance difference between using sortPartition and sorting in memory by partition, for datasets as small as 20 elements and running in local mode. For that data set sizes I would expect no relevant performance difference, but with sortPartition the program crashes, so I must be doing something wrong here.

Thanks in any case for the idea.

Greetings,

Juan

On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <[hidden email]> wrote:
Hi Juan,

If you want to deduplicate, then you could group by the record, and use a (very simple) reduce function to only emit a record if the group contains one element.

There will be performance issues, though - Flink will have to generate all groups first, which typically means spilling to disk if the data set has any significant size.

— Ken

PS - I assume that you’ve implemented a valid hashCode()/equals() for the record.


On Jul 22, 2019, at 8:29 AM, Juan Rodríguez Hortalá <[hidden email]> wrote:

Hi,

I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following:

private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))

val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
.sortPartition[(T, Boolean)](identity, Order.ASCENDING)
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
var latestOtherOpt: Option[T] = None
partitionIter.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment.

private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = {
val selfMarked: DataSet[(T, Boolean)] = self.map((_, true))
val otherMarked: DataSet[(T, Boolean)] = other.map((_, false))
val all = selfMarked.union(otherMarked)
.partitionByHash(0) // so occurrences of the same value in both datasets go to the same partition
all.mapPartition[T] { (partitionIter: Iterator[(T, Boolean)], collector: Collector[T]) =>
val sortedPartition = {
val partition = partitionIter.toArray
util.Sorting.quickSort(partition)
partition
}
var latestOtherOpt: Option[T] = None
sortedPartition.foreach {
case (otherElem, false) => latestOtherOpt = Some(otherElem)
case (selfElem, true) =>
if (latestOtherOpt != Some(selfElem)) collector.collect(selfElem)
}
}
}

I'm surprised by such a big difference. This is my code, and a test I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink?

Thanks,

Juan



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra