For each element in a dataset, do something with another dataset

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

For each element in a dataset, do something with another dataset

Pieter Hameete
Good day everyone,

I am looking for a good way to do the following:

I have dataset A and dataset B, and for each element in dataset A I would like to filter dataset B and obtain the size of the result. To say it short:

for each element a in A -> B.filter( _ < a.propertyx).count

Currently I am doing a cross of dataset A and B, making tuples so I can then filter all the tuples where field2 < field1.propertya and then group by field1.id and get the sizes of the groups.However this is not working out in practice. When the datasets get larger, some Tasks hang on the CHAIN Cross -> Filter probably because there is insufficient memory for the cross to be completed?

Does anyone have a suggestion on how I could make this work, especially with datasets that are larger than memory available to a separate Task?

Thank you in advance for your time :-)

Kind regards,

Pieter Hameete
Reply | Threaded
Open this post in threaded view
|

Re: For each element in a dataset, do something with another dataset

Fabian Hueske-2
Hi Pieter,

cross is indeed too expensive for this task.

If dataset A fits into memory, you can do the following: Use a RichMapPartitionFunction to process dataset B and add dataset A as a broadcastSet. In the open method of mapPartition, you can load the broadcasted set and sort it by a.propertyX and initialize a long[] for the counts. For each element of dataset B, you do a binary search on the sorted dataset A and increase all counts up to the position in the sorted list. After all elements of dataset B have been processed, return the counts from the long[].

If dataset A doesn't fit into memory, things become more cumbersome and we need to play some tricky with range partitioning...

Let me know, if you have questions,
Fabian

2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
Good day everyone,

I am looking for a good way to do the following:

I have dataset A and dataset B, and for each element in dataset A I would like to filter dataset B and obtain the size of the result. To say it short:

for each element a in A -> B.filter( _ < a.propertyx).count

Currently I am doing a cross of dataset A and B, making tuples so I can then filter all the tuples where field2 < field1.propertya and then group by field1.id and get the sizes of the groups.However this is not working out in practice. When the datasets get larger, some Tasks hang on the CHAIN Cross -> Filter probably because there is insufficient memory for the cross to be completed?

Does anyone have a suggestion on how I could make this work, especially with datasets that are larger than memory available to a separate Task?

Thank you in advance for your time :-)

Kind regards,

Pieter Hameete

Reply | Threaded
Open this post in threaded view
|

Re: For each element in a dataset, do something with another dataset

Pieter Hameete
Hi Fabian,

thanks for your tips!

Do you have some pointers for getting started with the 'tricky range partitioning'? I am quite keen to get this working with large datasets ;-)

Cheers,

Pieter

2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Pieter,

cross is indeed too expensive for this task.

If dataset A fits into memory, you can do the following: Use a RichMapPartitionFunction to process dataset B and add dataset A as a broadcastSet. In the open method of mapPartition, you can load the broadcasted set and sort it by a.propertyX and initialize a long[] for the counts. For each element of dataset B, you do a binary search on the sorted dataset A and increase all counts up to the position in the sorted list. After all elements of dataset B have been processed, return the counts from the long[].

If dataset A doesn't fit into memory, things become more cumbersome and we need to play some tricky with range partitioning...

Let me know, if you have questions,
Fabian

2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
Good day everyone,

I am looking for a good way to do the following:

I have dataset A and dataset B, and for each element in dataset A I would like to filter dataset B and obtain the size of the result. To say it short:

for each element a in A -> B.filter( _ < a.propertyx).count

Currently I am doing a cross of dataset A and B, making tuples so I can then filter all the tuples where field2 < field1.propertya and then group by field1.id and get the sizes of the groups.However this is not working out in practice. When the datasets get larger, some Tasks hang on the CHAIN Cross -> Filter probably because there is insufficient memory for the cross to be completed?

Does anyone have a suggestion on how I could make this work, especially with datasets that are larger than memory available to a separate Task?

Thank you in advance for your time :-)

Kind regards,

Pieter Hameete


Reply | Threaded
Open this post in threaded view
|

Re: For each element in a dataset, do something with another dataset

Fabian Hueske-2
The idea is to partition both datasets by range.
Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1: [1,2,3] and p2: [4,5,6].
Each partition is given to a different instance of a MapPartition operator (this is a bit tricky, because you cannot use broadcastSet. You could load the corresponding partition it in the open() function from HDFS for example).

DataSet B is partitioned in the same way, i.e., all elements <= 3 go to partition 1, everything > 3 goes to p2. You can partition a dataset by range using the partitionCustom() function. The partitioned dataset is given to the mapPartition operator that loaded a partition of dataset A in each task instance.
You do the counting just like before (sorting the partition of dataset A, binary sort, long[]), but add an additional count for the complete partition (basically count all elements that arrive in the task instance).

If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
Now you need to compute the final count by adding the "all" counts of the lower partitions to the counts of the "higher" partitions, i.e., add all:5 of p1 to all counts for p2.

This approach requires to know the value range and distribution of the values which makes it a bit difficult. I guess you'll get the best performance, if you partition in a way, that you have about equally sized partitions of dataset B with the constraint that the corresponding partitions of A fit into memory.

As I said, its a bit cumbersome. I hope you could follow my explanation.
Please ask if something is not clear ;-)

2015-09-30 10:46 GMT+02:00 Pieter Hameete <[hidden email]>:
Hi Fabian,

thanks for your tips!

Do you have some pointers for getting started with the 'tricky range partitioning'? I am quite keen to get this working with large datasets ;-)

Cheers,

Pieter

2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi Pieter,

cross is indeed too expensive for this task.

If dataset A fits into memory, you can do the following: Use a RichMapPartitionFunction to process dataset B and add dataset A as a broadcastSet. In the open method of mapPartition, you can load the broadcasted set and sort it by a.propertyX and initialize a long[] for the counts. For each element of dataset B, you do a binary search on the sorted dataset A and increase all counts up to the position in the sorted list. After all elements of dataset B have been processed, return the counts from the long[].

If dataset A doesn't fit into memory, things become more cumbersome and we need to play some tricky with range partitioning...

Let me know, if you have questions,
Fabian

2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
Good day everyone,

I am looking for a good way to do the following:

I have dataset A and dataset B, and for each element in dataset A I would like to filter dataset B and obtain the size of the result. To say it short:

for each element a in A -> B.filter( _ < a.propertyx).count

Currently I am doing a cross of dataset A and B, making tuples so I can then filter all the tuples where field2 < field1.propertya and then group by field1.id and get the sizes of the groups.However this is not working out in practice. When the datasets get larger, some Tasks hang on the CHAIN Cross -> Filter probably because there is insufficient memory for the cross to be completed?

Does anyone have a suggestion on how I could make this work, especially with datasets that are larger than memory available to a separate Task?

Thank you in advance for your time :-)

Kind regards,

Pieter Hameete



Reply | Threaded
Open this post in threaded view
|

Re: For each element in a dataset, do something with another dataset

Gábor Gévay
Hello,

Alternatively, if dataset B fits in memory, but dataset A doesn't,
then you can do it with broadcasting B to a RichMapPartitionFunction
on A:
In the open method of mapPartition, you sort B. Then, for each element
of A, you do a binary search in B, and look at the index found by the
binary search, which will be the count that you are looking for.

Best,
Gabor



2015-09-30 11:20 GMT+02:00 Fabian Hueske <[hidden email]>:

> The idea is to partition both datasets by range.
> Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> [1,2,3] and p2: [4,5,6].
> Each partition is given to a different instance of a MapPartition operator
> (this is a bit tricky, because you cannot use broadcastSet. You could load
> the corresponding partition it in the open() function from HDFS for
> example).
>
> DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> partition 1, everything > 3 goes to p2. You can partition a dataset by range
> using the partitionCustom() function. The partitioned dataset is given to
> the mapPartition operator that loaded a partition of dataset A in each task
> instance.
> You do the counting just like before (sorting the partition of dataset A,
> binary sort, long[]), but add an additional count for the complete partition
> (basically count all elements that arrive in the task instance).
>
> If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would
> be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> Now you need to compute the final count by adding the "all" counts of the
> lower partitions to the counts of the "higher" partitions, i.e., add all:5
> of p1 to all counts for p2.
>
> This approach requires to know the value range and distribution of the
> values which makes it a bit difficult. I guess you'll get the best
> performance, if you partition in a way, that you have about equally sized
> partitions of dataset B with the constraint that the corresponding
> partitions of A fit into memory.
>
> As I said, its a bit cumbersome. I hope you could follow my explanation.
> Please ask if something is not clear ;-)
>
> 2015-09-30 10:46 GMT+02:00 Pieter Hameete <[hidden email]>:
>>
>> Hi Fabian,
>>
>> thanks for your tips!
>>
>> Do you have some pointers for getting started with the 'tricky range
>> partitioning'? I am quite keen to get this working with large datasets ;-)
>>
>> Cheers,
>>
>> Pieter
>>
>> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>
>>> Hi Pieter,
>>>
>>> cross is indeed too expensive for this task.
>>>
>>> If dataset A fits into memory, you can do the following: Use a
>>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> broadcastSet. In the open method of mapPartition, you can load the
>>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>>> counts. For each element of dataset B, you do a binary search on the sorted
>>> dataset A and increase all counts up to the position in the sorted list.
>>> After all elements of dataset B have been processed, return the counts from
>>> the long[].
>>>
>>> If dataset A doesn't fit into memory, things become more cumbersome and
>>> we need to play some tricky with range partitioning...
>>>
>>> Let me know, if you have questions,
>>> Fabian
>>>
>>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
>>>>
>>>> Good day everyone,
>>>>
>>>> I am looking for a good way to do the following:
>>>>
>>>> I have dataset A and dataset B, and for each element in dataset A I
>>>> would like to filter dataset B and obtain the size of the result. To say it
>>>> short:
>>>>
>>>> for each element a in A -> B.filter( _ < a.propertyx).count
>>>>
>>>> Currently I am doing a cross of dataset A and B, making tuples so I can
>>>> then filter all the tuples where field2 < field1.propertya and then group by
>>>> field1.id and get the sizes of the groups.However this is not working out in
>>>> practice. When the datasets get larger, some Tasks hang on the CHAIN Cross
>>>> -> Filter probably because there is insufficient memory for the cross to be
>>>> completed?
>>>>
>>>> Does anyone have a suggestion on how I could make this work, especially
>>>> with datasets that are larger than memory available to a separate Task?
>>>>
>>>> Thank you in advance for your time :-)
>>>>
>>>> Kind regards,
>>>>
>>>> Pieter Hameete
>>>
>>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: For each element in a dataset, do something with another dataset

Pieter Hameete
Hi Gabor, Fabian,

thank you for your suggestions. I am intending to scale up so that I'm sure that both A and B won't fit in memory. I'll see if I can come up with a nice way to partition the datasets but if that will take too much time I'll just have to accept that it wont work on large datasets. I'll let you know if I managed to work something out, but I wont work on it until the weekend :-)

Cheers again,

Pieter

2015-09-30 12:28 GMT+02:00 Gábor Gévay <[hidden email]>:
Hello,

Alternatively, if dataset B fits in memory, but dataset A doesn't,
then you can do it with broadcasting B to a RichMapPartitionFunction
on A:
In the open method of mapPartition, you sort B. Then, for each element
of A, you do a binary search in B, and look at the index found by the
binary search, which will be the count that you are looking for.

Best,
Gabor



2015-09-30 11:20 GMT+02:00 Fabian Hueske <[hidden email]>:
> The idea is to partition both datasets by range.
> Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> [1,2,3] and p2: [4,5,6].
> Each partition is given to a different instance of a MapPartition operator
> (this is a bit tricky, because you cannot use broadcastSet. You could load
> the corresponding partition it in the open() function from HDFS for
> example).
>
> DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> partition 1, everything > 3 goes to p2. You can partition a dataset by range
> using the partitionCustom() function. The partitioned dataset is given to
> the mapPartition operator that loaded a partition of dataset A in each task
> instance.
> You do the counting just like before (sorting the partition of dataset A,
> binary sort, long[]), but add an additional count for the complete partition
> (basically count all elements that arrive in the task instance).
>
> If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would
> be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> Now you need to compute the final count by adding the "all" counts of the
> lower partitions to the counts of the "higher" partitions, i.e., add all:5
> of p1 to all counts for p2.
>
> This approach requires to know the value range and distribution of the
> values which makes it a bit difficult. I guess you'll get the best
> performance, if you partition in a way, that you have about equally sized
> partitions of dataset B with the constraint that the corresponding
> partitions of A fit into memory.
>
> As I said, its a bit cumbersome. I hope you could follow my explanation.
> Please ask if something is not clear ;-)
>
> 2015-09-30 10:46 GMT+02:00 Pieter Hameete <[hidden email]>:
>>
>> Hi Fabian,
>>
>> thanks for your tips!
>>
>> Do you have some pointers for getting started with the 'tricky range
>> partitioning'? I am quite keen to get this working with large datasets ;-)
>>
>> Cheers,
>>
>> Pieter
>>
>> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>
>>> Hi Pieter,
>>>
>>> cross is indeed too expensive for this task.
>>>
>>> If dataset A fits into memory, you can do the following: Use a
>>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> broadcastSet. In the open method of mapPartition, you can load the
>>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>>> counts. For each element of dataset B, you do a binary search on the sorted
>>> dataset A and increase all counts up to the position in the sorted list.
>>> After all elements of dataset B have been processed, return the counts from
>>> the long[].
>>>
>>> If dataset A doesn't fit into memory, things become more cumbersome and
>>> we need to play some tricky with range partitioning...
>>>
>>> Let me know, if you have questions,
>>> Fabian
>>>
>>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
>>>>
>>>> Good day everyone,
>>>>
>>>> I am looking for a good way to do the following:
>>>>
>>>> I have dataset A and dataset B, and for each element in dataset A I
>>>> would like to filter dataset B and obtain the size of the result. To say it
>>>> short:
>>>>
>>>> for each element a in A -> B.filter( _ < a.propertyx).count
>>>>
>>>> Currently I am doing a cross of dataset A and B, making tuples so I can
>>>> then filter all the tuples where field2 < field1.propertya and then group by
>>>> field1.id and get the sizes of the groups.However this is not working out in
>>>> practice. When the datasets get larger, some Tasks hang on the CHAIN Cross
>>>> -> Filter probably because there is insufficient memory for the cross to be
>>>> completed?
>>>>
>>>> Does anyone have a suggestion on how I could make this work, especially
>>>> with datasets that are larger than memory available to a separate Task?
>>>>
>>>> Thank you in advance for your time :-)
>>>>
>>>> Kind regards,
>>>>
>>>> Pieter Hameete
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: For each element in a dataset, do something with another dataset

Pieter Hameete
Hi Fabian,

I have a question regarding the first approach. Is there a benefit gained from choosing a RichMapPartitionFunction over a RichMapFunction in this case? I assume that each broadcasted dataset is sent only once to each task manager?

If I would broadcast dataset B, then I could for each element a in A count the number of elements in B that are smaller than a and output a tuple in a map operation. This would also save me a step in aggregating the results?

Kind regards,

Pieter

2015-09-30 12:44 GMT+02:00 Pieter Hameete <[hidden email]>:
Hi Gabor, Fabian,

thank you for your suggestions. I am intending to scale up so that I'm sure that both A and B won't fit in memory. I'll see if I can come up with a nice way to partition the datasets but if that will take too much time I'll just have to accept that it wont work on large datasets. I'll let you know if I managed to work something out, but I wont work on it until the weekend :-)

Cheers again,

Pieter

2015-09-30 12:28 GMT+02:00 Gábor Gévay <[hidden email]>:
Hello,

Alternatively, if dataset B fits in memory, but dataset A doesn't,
then you can do it with broadcasting B to a RichMapPartitionFunction
on A:
In the open method of mapPartition, you sort B. Then, for each element
of A, you do a binary search in B, and look at the index found by the
binary search, which will be the count that you are looking for.

Best,
Gabor



2015-09-30 11:20 GMT+02:00 Fabian Hueske <[hidden email]>:
> The idea is to partition both datasets by range.
> Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> [1,2,3] and p2: [4,5,6].
> Each partition is given to a different instance of a MapPartition operator
> (this is a bit tricky, because you cannot use broadcastSet. You could load
> the corresponding partition it in the open() function from HDFS for
> example).
>
> DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> partition 1, everything > 3 goes to p2. You can partition a dataset by range
> using the partitionCustom() function. The partitioned dataset is given to
> the mapPartition operator that loaded a partition of dataset A in each task
> instance.
> You do the counting just like before (sorting the partition of dataset A,
> binary sort, long[]), but add an additional count for the complete partition
> (basically count all elements that arrive in the task instance).
>
> If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would
> be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> Now you need to compute the final count by adding the "all" counts of the
> lower partitions to the counts of the "higher" partitions, i.e., add all:5
> of p1 to all counts for p2.
>
> This approach requires to know the value range and distribution of the
> values which makes it a bit difficult. I guess you'll get the best
> performance, if you partition in a way, that you have about equally sized
> partitions of dataset B with the constraint that the corresponding
> partitions of A fit into memory.
>
> As I said, its a bit cumbersome. I hope you could follow my explanation.
> Please ask if something is not clear ;-)
>
> 2015-09-30 10:46 GMT+02:00 Pieter Hameete <[hidden email]>:
>>
>> Hi Fabian,
>>
>> thanks for your tips!
>>
>> Do you have some pointers for getting started with the 'tricky range
>> partitioning'? I am quite keen to get this working with large datasets ;-)
>>
>> Cheers,
>>
>> Pieter
>>
>> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>
>>> Hi Pieter,
>>>
>>> cross is indeed too expensive for this task.
>>>
>>> If dataset A fits into memory, you can do the following: Use a
>>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> broadcastSet. In the open method of mapPartition, you can load the
>>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>>> counts. For each element of dataset B, you do a binary search on the sorted
>>> dataset A and increase all counts up to the position in the sorted list.
>>> After all elements of dataset B have been processed, return the counts from
>>> the long[].
>>>
>>> If dataset A doesn't fit into memory, things become more cumbersome and
>>> we need to play some tricky with range partitioning...
>>>
>>> Let me know, if you have questions,
>>> Fabian
>>>
>>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
>>>>
>>>> Good day everyone,
>>>>
>>>> I am looking for a good way to do the following:
>>>>
>>>> I have dataset A and dataset B, and for each element in dataset A I
>>>> would like to filter dataset B and obtain the size of the result. To say it
>>>> short:
>>>>
>>>> for each element a in A -> B.filter( _ < a.propertyx).count
>>>>
>>>> Currently I am doing a cross of dataset A and B, making tuples so I can
>>>> then filter all the tuples where field2 < field1.propertya and then group by
>>>> field1.id and get the sizes of the groups.However this is not working out in
>>>> practice. When the datasets get larger, some Tasks hang on the CHAIN Cross
>>>> -> Filter probably because there is insufficient memory for the cross to be
>>>> completed?
>>>>
>>>> Does anyone have a suggestion on how I could make this work, especially
>>>> with datasets that are larger than memory available to a separate Task?
>>>>
>>>> Thank you in advance for your time :-)
>>>>
>>>> Kind regards,
>>>>
>>>> Pieter Hameete
>>>
>>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: For each element in a dataset, do something with another dataset

Fabian Hueske-2
Hi Pieter,

a FlatMapFunction can only return values when the map() method is called. However, in your use case, you would like to return values *after* the function was called the last time. This is not possible with a FlatMapFunction, because you cannot identify the last map() call.
The MapPartitionFunction is called only once with an iterator over the whole partition. Hence you can return values after the iterator was fully consumed.

The broadcast set is sent only once in both cases.

If it is possible to broadcast dataset B, you can also use a MapFunction and don't need to store the count values.

Best, Fabian

2015-10-05 11:53 GMT+02:00 Pieter Hameete <[hidden email]>:
Hi Fabian,

I have a question regarding the first approach. Is there a benefit gained from choosing a RichMapPartitionFunction over a RichMapFunction in this case? I assume that each broadcasted dataset is sent only once to each task manager?

If I would broadcast dataset B, then I could for each element a in A count the number of elements in B that are smaller than a and output a tuple in a map operation. This would also save me a step in aggregating the results?

Kind regards,

Pieter

2015-09-30 12:44 GMT+02:00 Pieter Hameete <[hidden email]>:
Hi Gabor, Fabian,

thank you for your suggestions. I am intending to scale up so that I'm sure that both A and B won't fit in memory. I'll see if I can come up with a nice way to partition the datasets but if that will take too much time I'll just have to accept that it wont work on large datasets. I'll let you know if I managed to work something out, but I wont work on it until the weekend :-)

Cheers again,

Pieter

2015-09-30 12:28 GMT+02:00 Gábor Gévay <[hidden email]>:
Hello,

Alternatively, if dataset B fits in memory, but dataset A doesn't,
then you can do it with broadcasting B to a RichMapPartitionFunction
on A:
In the open method of mapPartition, you sort B. Then, for each element
of A, you do a binary search in B, and look at the index found by the
binary search, which will be the count that you are looking for.

Best,
Gabor



2015-09-30 11:20 GMT+02:00 Fabian Hueske <[hidden email]>:
> The idea is to partition both datasets by range.
> Assume your dataset A is [1,2,3,4,5,6] you create two partitions: p1:
> [1,2,3] and p2: [4,5,6].
> Each partition is given to a different instance of a MapPartition operator
> (this is a bit tricky, because you cannot use broadcastSet. You could load
> the corresponding partition it in the open() function from HDFS for
> example).
>
> DataSet B is partitioned in the same way, i.e., all elements <= 3 go to
> partition 1, everything > 3 goes to p2. You can partition a dataset by range
> using the partitionCustom() function. The partitioned dataset is given to
> the mapPartition operator that loaded a partition of dataset A in each task
> instance.
> You do the counting just like before (sorting the partition of dataset A,
> binary sort, long[]), but add an additional count for the complete partition
> (basically count all elements that arrive in the task instance).
>
> If you have a dataset B with 1,2,2,3,3,4,5,5,5,5,6,7 the counts for p1 would
> be [1:0, 2:1, 3:3, all:5] and p2: [4:0, 5:1, 6:5, all:7].
> Now you need to compute the final count by adding the "all" counts of the
> lower partitions to the counts of the "higher" partitions, i.e., add all:5
> of p1 to all counts for p2.
>
> This approach requires to know the value range and distribution of the
> values which makes it a bit difficult. I guess you'll get the best
> performance, if you partition in a way, that you have about equally sized
> partitions of dataset B with the constraint that the corresponding
> partitions of A fit into memory.
>
> As I said, its a bit cumbersome. I hope you could follow my explanation.
> Please ask if something is not clear ;-)
>
> 2015-09-30 10:46 GMT+02:00 Pieter Hameete <[hidden email]>:
>>
>> Hi Fabian,
>>
>> thanks for your tips!
>>
>> Do you have some pointers for getting started with the 'tricky range
>> partitioning'? I am quite keen to get this working with large datasets ;-)
>>
>> Cheers,
>>
>> Pieter
>>
>> 2015-09-30 10:24 GMT+02:00 Fabian Hueske <[hidden email]>:
>>>
>>> Hi Pieter,
>>>
>>> cross is indeed too expensive for this task.
>>>
>>> If dataset A fits into memory, you can do the following: Use a
>>> RichMapPartitionFunction to process dataset B and add dataset A as a
>>> broadcastSet. In the open method of mapPartition, you can load the
>>> broadcasted set and sort it by a.propertyX and initialize a long[] for the
>>> counts. For each element of dataset B, you do a binary search on the sorted
>>> dataset A and increase all counts up to the position in the sorted list.
>>> After all elements of dataset B have been processed, return the counts from
>>> the long[].
>>>
>>> If dataset A doesn't fit into memory, things become more cumbersome and
>>> we need to play some tricky with range partitioning...
>>>
>>> Let me know, if you have questions,
>>> Fabian
>>>
>>> 2015-09-29 16:59 GMT+02:00 Pieter Hameete <[hidden email]>:
>>>>
>>>> Good day everyone,
>>>>
>>>> I am looking for a good way to do the following:
>>>>
>>>> I have dataset A and dataset B, and for each element in dataset A I
>>>> would like to filter dataset B and obtain the size of the result. To say it
>>>> short:
>>>>
>>>> for each element a in A -> B.filter( _ < a.propertyx).count
>>>>
>>>> Currently I am doing a cross of dataset A and B, making tuples so I can
>>>> then filter all the tuples where field2 < field1.propertya and then group by
>>>> field1.id and get the sizes of the groups.However this is not working out in
>>>> practice. When the datasets get larger, some Tasks hang on the CHAIN Cross
>>>> -> Filter probably because there is insufficient memory for the cross to be
>>>> completed?
>>>>
>>>> Does anyone have a suggestion on how I could make this work, especially
>>>> with datasets that are larger than memory available to a separate Task?
>>>>
>>>> Thank you in advance for your time :-)
>>>>
>>>> Kind regards,
>>>>
>>>> Pieter Hameete
>>>
>>>
>>
>