Good day everyone,
I am looking for a good way to do the following: I have dataset A and dataset B, and for each element in dataset A I would like to filter dataset B and obtain the size of the result. To say it short: for each element a in A -> B.filter( _ < a.propertyx).count Currently I am doing a cross of dataset A and B, making tuples so I can then filter all the tuples where field2 < field1.propertya and then group by field1.id and get the sizes of the groups.However this is not working out in practice. When the datasets get larger, some Tasks hang on the CHAIN Cross -> Filter probably because there is insufficient memory for the cross to be completed? Does anyone have a suggestion on how I could make this work, especially with datasets that are larger than memory available to a separate Task? Thank you in advance for your time :-) Kind regards, Pieter Hameete |
Hi Pieter, cross is indeed too expensive for this task.If dataset A fits into memory, you can do the following: Use a RichMapPartitionFunction to process dataset B and add dataset A as a broadcastSet. In the open method of mapPartition, you can load the broadcasted set and sort it by a.propertyX and initialize a long[] for the counts. For each element of dataset B, you do a binary search on the sorted dataset A and increase all counts up to the position in the sorted list. After all elements of dataset B have been processed, return the counts from the long[]. If dataset A doesn't fit into memory, things become more cumbersome and we need to play some tricky with range partitioning... Let me know, if you have questions, Fabian 2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
|
Hi Fabian, thanks for your tips! Do you have some pointers for getting started with the 'tricky range partitioning'? I am quite keen to get this working with large datasets ;-) Cheers, Pieter 2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>:
|
The idea is to partition both datasets by range. Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: [1,2,3] and p2: [4,5,6].Please ask if something is not clear ;-) 2015-09-30 10:46 GMT+02:00 Pieter Hameete <[hidden email]>:
|
Hello,
Alternatively, if dataset B fits in memory, but dataset A doesn't, then you can do it with broadcasting B to a RichMapPartitionFunction on A: In the open method of mapPartition, you sort B. Then, for each element of A, you do a binary search in B, and look at the index found by the binary search, which will be the count that you are looking for. Best, Gabor 2015-09-30 11:20 GMT+02:00 Fabian Hueske <[hidden email]>: > The idea is to partition both datasets by range. > Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: > [1,2,3] and p2: [4,5,6]. > Each partition is given to a different instance of a MapPartition operator > (this is a bit tricky, because you cannot use broadcastSet. You could load > the corresponding partition it in the open() function from HDFS for > example). > > DataSet B is partitioned in the same way, i.e., all elements <= 3 go to > partition 1, everything > 3 goes to p2. You can partition a dataset by range > using the partitionCustom() function. The partitioned dataset is given to > the mapPartition operator that loaded a partition of dataset A in each task > instance. > You do the counting just like before (sorting the partition of dataset A, > binary sort, long[]), but add an additional count for the complete partition > (basically count all elements that arrive in the task instance). > > If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would > be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7]. > Now you need to compute the final count by adding the "all" counts of the > lower partitions to the counts of the "higher" partitions, i.e., add all:5 > of p1 to all counts for p2. > > This approach requires to know the value range and distribution of the > values which makes it a bit difficult. I guess you'll get the best > performance, if you partition in a way, that you have about equally sized > partitions of dataset B with the constraint that the corresponding > partitions of A fit into memory. > > As I said, its a bit cumbersome. I hope you could follow my explanation. > Please ask if something is not clear ;-) > > 2015-09-30 10:46 GMT+02:00 Pieter Hameete <[hidden email]>: >> >> Hi Fabian, >> >> thanks for your tips! >> >> Do you have some pointers for getting started with the 'tricky range >> partitioning'? I am quite keen to get this working with large datasets ;-) >> >> Cheers, >> >> Pieter >> >> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>: >>> >>> Hi Pieter, >>> >>> cross is indeed too expensive for this task. >>> >>> If dataset A fits into memory, you can do the following: Use a >>> RichMapPartitionFunction to process dataset B and add dataset A as a >>> broadcastSet. In the open method of mapPartition, you can load the >>> broadcasted set and sort it by a.propertyX and initialize a long[] for the >>> counts. For each element of dataset B, you do a binary search on the sorted >>> dataset A and increase all counts up to the position in the sorted list. >>> After all elements of dataset B have been processed, return the counts from >>> the long[]. >>> >>> If dataset A doesn't fit into memory, things become more cumbersome and >>> we need to play some tricky with range partitioning... >>> >>> Let me know, if you have questions, >>> Fabian >>> >>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>: >>>> >>>> Good day everyone, >>>> >>>> I am looking for a good way to do the following: >>>> >>>> I have dataset A and dataset B, and for each element in dataset A I >>>> would like to filter dataset B and obtain the size of the result. To say it >>>> short: >>>> >>>> for each element a in A -> B.filter( _ < a.propertyx).count >>>> >>>> Currently I am doing a cross of dataset A and B, making tuples so I can >>>> then filter all the tuples where field2 < field1.propertya and then group by >>>> field1.id and get the sizes of the groups.However this is not working out in >>>> practice. When the datasets get larger, some Tasks hang on the CHAIN Cross >>>> -> Filter probably because there is insufficient memory for the cross to be >>>> completed? >>>> >>>> Does anyone have a suggestion on how I could make this work, especially >>>> with datasets that are larger than memory available to a separate Task? >>>> >>>> Thank you in advance for your time :-) >>>> >>>> Kind regards, >>>> >>>> Pieter Hameete >>> >>> >> > |
Hi Gabor, Fabian, thank you for your suggestions. I am intending to scale up so that I'm sure that both A and B won't fit in memory. I'll see if I can come up with a nice way to partition the datasets but if that will take too much time I'll just have to accept that it wont work on large datasets. I'll let you know if I managed to work something out, but I wont work on it until the weekend :-) Cheers again, Pieter 2015-09-30 12:28 GMT+02:00 Gábor Gévay <[hidden email]>: Hello, |
Hi Fabian, I have a question regarding the first approach. Is there a benefit gained from choosing a RichMapPartitionFunction over a RichMapFunction in this case? I assume that each broadcasted dataset is sent only once to each task manager? If I would broadcast dataset B, then I could for each element a in A count the number of elements in B that are smaller than a and output a tuple in a map operation. This would also save me a step in aggregating the results? Kind regards, Pieter 2015-09-30 12:44 GMT+02:00 Pieter Hameete <[hidden email]>:
|
Hi Pieter, a FlatMapFunction can only return values when the map() method is called. However, in your use case, you would like to return values *after* the function was called the last time. This is not possible with a FlatMapFunction, because you cannot identify the last map() call.The broadcast set is sent only once in both cases. If it is possible to broadcast dataset B, you can also use a MapFunction and don't need to store the count values. 2015-10-05 11:53 GMT+02:00 Pieter Hameete <[hidden email]>:
|
Free forum by Nabble | Edit this page |