How to get top N elements in a DataSet?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to get top N elements in a DataSet?

Ivan Mushketyk
Hi,

I have a dataset of tuples with two fields ids and ratings and I need to find 10 elements with the highest rating in this dataset. I found a solution, but I think it's suboptimal and I think there should be a better way to do it.

The best thing that I came up with is to partition dataset by rating, sort locally and write the partitioned dataset to disk:

dataset
.partitionCustom(new Partitioner<Double>() {
  @Override
  public int partition(Double key, int numPartitions) {
    return key.intValue() % numPartitions;
  }
}, 1) . // partition by rating
.setParallelism(5)
.sortPartition(1, Order.DESCENDING) // locally sort by rating
.writeAsText("..."); // write the partitioned dataset to disk

This will store tuples in sorted files with names 5, 4, 3, ... that contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted data from disk and and N elements with the highest rating.
Is there a way to do the same but without writing a partitioned dataset to a disk?

I tried to use "first(10)" but it seems to give top 10 items from a random partition. Is there a way to get top N elements from every partition? Then I could locally sort top values from every partition and find top 10 global values.

Best regards,
Ivan.


Reply | Threaded
Open this post in threaded view
|

Re: How to get top N elements in a DataSet?

Fabian Hueske-2
Hi Ivan,

I think you can use MapPartition for that.
So basically:

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING).parallelism(1)
  .mapPartition(new ReturnFirstTen())

Best, Fabian


2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>:
Hi,

I have a dataset of tuples with two fields ids and ratings and I need to find 10 elements with the highest rating in this dataset. I found a solution, but I think it's suboptimal and I think there should be a better way to do it.

The best thing that I came up with is to partition dataset by rating, sort locally and write the partitioned dataset to disk:

dataset
.partitionCustom(new Partitioner<Double>() {
  @Override
  public int partition(Double key, int numPartitions) {
    return key.intValue() % numPartitions;
  }
}, 1) . // partition by rating
.setParallelism(5)
.sortPartition(1, Order.DESCENDING) // locally sort by rating
.writeAsText("..."); // write the partitioned dataset to disk

This will store tuples in sorted files with names 5, 4, 3, ... that contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted data from disk and and N elements with the highest rating.
Is there a way to do the same but without writing a partitioned dataset to a disk?

I tried to use "first(10)" but it seems to give top 10 items from a random partition. Is there a way to get top N elements from every partition? Then I could locally sort top values from every partition and find top 10 global values.

Best regards,
Ivan.



Reply | Threaded
Open this post in threaded view
|

Re: How to get top N elements in a DataSet?

Gábor Gévay
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske <[hidden email]>:

> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>:
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner<Double>() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>>     return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: How to get top N elements in a DataSet?

Fabian Hueske-2
You are of course right Gabor.
@Ivan, you can use a heap in the MapPartitionFunction to collect the top 10 elements (note that you need to create deep-copies if object reuse is enabled [1]).

Best, Fabian

2017-01-24 11:49 GMT+01:00 Gábor Gévay <[hidden email]>:
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske <[hidden email]>:
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>:
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner<Double>() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>>     return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to get top N elements in a DataSet?

Aljoscha Krettek
@Fabian, I think there's a typo in your code, shouldn't it be

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

i.e. the second MapPartition has to be parallelism=1


On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <[hidden email]> wrote:
You are of course right Gabor.
@Ivan, you can use a heap in the MapPartitionFunction to collect the top 10 elements (note that you need to create deep-copies if object reuse is enabled [1]).

Best, Fabian

2017-01-24 11:49 GMT+01:00 Gábor Gévay <[hidden email]>:
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske <[hidden email]>:
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>:
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner<Double>() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>>     return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: How to get top N elements in a DataSet?

Fabian Hueske-2
Aljoscha, you are right.
The second mapPartition() needs to have parallelism(1), but the sortPartition() as well:

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING).parallelism(1)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

Anyway, as Gabor pointed out, this solution is very in efficient.

2017-01-24 17:52 GMT+01:00 Aljoscha Krettek <[hidden email]>:
@Fabian, I think there's a typo in your code, shouldn't it be

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

i.e. the second MapPartition has to be parallelism=1


On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <[hidden email]> wrote:
You are of course right Gabor.
@Ivan, you can use a heap in the MapPartitionFunction to collect the top 10 elements (note that you need to create deep-copies if object reuse is enabled [1]).

Best, Fabian

2017-01-24 11:49 GMT+01:00 Gábor Gévay <[hidden email]>:
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske <[hidden email]>:
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>:
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner<Double>() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>>     return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: How to get top N elements in a DataSet?

Ivan Mushketyk
Hi @Fabian, @Gabor, and @Aljoscha,

Thank you for your help! It works as expected.

Best regards,
Ivan.

On Tue, 24 Jan 2017 at 17:04 Fabian Hueske <[hidden email]> wrote:
Aljoscha, you are right.
The second mapPartition() needs to have parallelism(1), but the sortPartition() as well:


dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING).parallelism(1)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

Anyway, as Gabor pointed out, this solution is very in efficient.

2017-01-24 17:52 GMT+01:00 Aljoscha Krettek <[hidden email]>:
@Fabian, I think there's a typo in your code, shouldn't it be

dataset // assuming some partitioning that can be reused to avoid a shuffle
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen())
  .sortPartition(1, Order.DESCENDING)
  .mapPartition(new ReturnFirstTen()).parallelism(1)

i.e. the second MapPartition has to be parallelism=1


On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <[hidden email]> wrote:
You are of course right Gabor.
@Ivan, you can use a heap in the MapPartitionFunction to collect the top 10 elements (note that you need to create deep-copies if object reuse is enabled [1]).

Best, Fabian

2017-01-24 11:49 GMT+01:00 Gábor Gévay <[hidden email]>:
Hello,

Btw. there is a Jira about this:
https://issues.apache.org/jira/browse/FLINK-2549
Note that the discussion there suggests a more efficient approach,
which doesn't involve sorting the entire partitions.

And if I remember correctly, this question comes up from time to time
on the mailing list.

Best,
Gábor



2017-01-24 11:35 GMT+01:00 Fabian Hueske <[hidden email]>:
> Hi Ivan,
>
> I think you can use MapPartition for that.
> So basically:
>
> dataset // assuming some partitioning that can be reused to avoid a shuffle
>   .sortPartition(1, Order.DESCENDING)
>   .mapPartition(new ReturnFirstTen())
>   .sortPartition(1, Order.DESCENDING).parallelism(1)
>   .mapPartition(new ReturnFirstTen())
>
> Best, Fabian
>
>
> 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>:
>>
>> Hi,
>>
>> I have a dataset of tuples with two fields ids and ratings and I need to
>> find 10 elements with the highest rating in this dataset. I found a
>> solution, but I think it's suboptimal and I think there should be a better
>> way to do it.
>>
>> The best thing that I came up with is to partition dataset by rating, sort
>> locally and write the partitioned dataset to disk:
>>
>> dataset
>> .partitionCustom(new Partitioner<Double>() {
>>   @Override
>>   public int partition(Double key, int numPartitions) {
>>     return key.intValue() % numPartitions;
>>   }
>> }, 1) . // partition by rating
>> .setParallelism(5)
>> .sortPartition(1, Order.DESCENDING) // locally sort by rating
>> .writeAsText("..."); // write the partitioned dataset to disk
>>
>> This will store tuples in sorted files with names 5, 4, 3, ... that
>> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted
>> data from disk and and N elements with the highest rating.
>> Is there a way to do the same but without writing a partitioned dataset to
>> a disk?
>>
>> I tried to use "first(10)" but it seems to give top 10 items from a random
>> partition. Is there a way to get top N elements from every partition? Then I
>> could locally sort top values from every partition and find top 10 global
>> values.
>>
>> Best regards,
>> Ivan.
>>
>>
>