Hi, I've been trying to write a function to compute the difference between 2 datasets. With that I mean computing a dataset that has all the elements of a dataset that are not present in another dataset. I first tried using coCogroup, but it was very slow in a local execution environment, and often was crashing with OOM. Then I tried with leftOuterJoin and got similar results. I then tried the following: private[this] def minussWithSortPartition(other: DataSet[T]): DataSet[T] = { This is basically the idea of removing duplicates in a collection by first sorting it, and then traversing it from beginning to end, removing the elements that are consecutive to an element we just saw. That is extended here to mark whether an element is coming from `self` or from `other`, keeping only elements from `self` that are not following another occurrence of the same element in `other`. That code is also really slow on a local execution environment, and crashes a lot. But when I replace `sortPartition` by sorting each partition in memory inside a mapPartition, it works ok with the local execution environment. private[this] def minusWithInMemoryPartition(other: DataSet[T]): DataSet[T] = { I'm surprised by such a big difference. This is my code, and a test I use for running this. I'm very surprised with these performance issues with such small DataSet sizes, with less than 20 elements. Is this because I'm running the program with a local execution environment?, are operations like coGroup, leftOuterJoin or sorPartition implemented inefficiently in the local environment? If that is the case, is there any other alternative environment recommended for development in a single machine, where I won't be experiencing these issues with those operations? Should I expect the function `minussWithSortPartition` above to run efficiently on a cluster? Or maybe there is something wrong with my code? Are there any plans to provide a built-in minus operator in future versions of Flink? Thanks, Juan |
Hi Juan,
If you want to deduplicate, then you could group by the record, and use a (very simple) reduce function to only emit a record if the group contains one element. There will be performance issues, though - Flink will have to generate all groups first, which typically means spilling to disk if the data set has any significant size. — Ken PS - I assume that you’ve implemented a valid hashCode()/equals() for the record.
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi Ken, Thanks for the suggestion, that idea should also work for implementing a data set difference operation, which is what concerns me here. However, I was also curious about why there is so much performance difference between using sortPartition and sorting in memory by partition, for datasets as small as 20 elements and running in local mode. For that data set sizes I would expect no relevant performance difference, but with sortPartition the program crashes, so I must be doing something wrong here. Thanks in any case for the idea. Greetings, Juan On Mon, Jul 22, 2019 at 8:49 AM Ken Krugler <[hidden email]> wrote:
|
Hi Juan, Both, the local execution environment and the remote execution environment run the same code to execute the program. The implementation of the sortPartition operator was designed to scale to data sizes that exceed the memory. Internally, it serializes all records into byte arrays and sorts the serialized data. This is of course more expensive than keeping all objects on the heap and sorting them there. Hence, a certain performance difference is to be expected. However, something that should not happen is that the program fails. What's the magnitude of the performance difference? Can you post a stack trace of the error? Thanks, Fabian Am Mo., 16. Sept. 2019 um 13:51 Uhr schrieb Juan Rodríguez Hortalá <[hidden email]>:
|
Btw. there is a set difference or minus operator in the Table API [1] that might be helpful. Am Fr., 20. Sept. 2019 um 15:30 Uhr schrieb Fabian Hueske <[hidden email]>:
|
Free forum by Nabble | Edit this page |