Hi,
I have a dataset of tuples with two fields ids and ratings and I need to find 10 elements with the highest rating in this dataset. I found a solution, but I think it's suboptimal and I think there should be a better way to do it. The best thing that I came up with is to partition dataset by rating, sort locally and write the partitioned dataset to disk: dataset .partitionCustom(new Partitioner<Double>() {@Override public int partition(Double key, int numPartitions) { return key.intValue() % numPartitions; } }, 1) . // partition by rating .setParallelism(5) .sortPartition(1, Order.DESCENDING) // locally sort by rating .writeAsText("..."); // write the partitioned dataset to disk This will store tuples in sorted files with names 5, 4, 3, ... that contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted data from disk and and N elements with the highest rating. Is there a way to do the same but without writing a partitioned dataset to a disk? I tried to use "first(10)" but it seems to give top 10 items from a random partition. Is there a way to get top N elements from every partition? Then I could locally sort top values from every partition and find top 10 global values. Best regards, Ivan. |
Hi Ivan, I think you can use MapPartition for that. .sortPartition(1, Order.DESCENDING) .mapPartition(new ReturnFirstTen())2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>:
|
Hello,
Btw. there is a Jira about this: https://issues.apache.org/jira/browse/FLINK-2549 Note that the discussion there suggests a more efficient approach, which doesn't involve sorting the entire partitions. And if I remember correctly, this question comes up from time to time on the mailing list. Best, Gábor 2017-01-24 11:35 GMT+01:00 Fabian Hueske <[hidden email]>: > Hi Ivan, > > I think you can use MapPartition for that. > So basically: > > dataset // assuming some partitioning that can be reused to avoid a shuffle > .sortPartition(1, Order.DESCENDING) > .mapPartition(new ReturnFirstTen()) > .sortPartition(1, Order.DESCENDING).parallelism(1) > .mapPartition(new ReturnFirstTen()) > > Best, Fabian > > > 2017-01-24 10:10 GMT+01:00 Ivan Mushketyk <[hidden email]>: >> >> Hi, >> >> I have a dataset of tuples with two fields ids and ratings and I need to >> find 10 elements with the highest rating in this dataset. I found a >> solution, but I think it's suboptimal and I think there should be a better >> way to do it. >> >> The best thing that I came up with is to partition dataset by rating, sort >> locally and write the partitioned dataset to disk: >> >> dataset >> .partitionCustom(new Partitioner<Double>() { >> @Override >> public int partition(Double key, int numPartitions) { >> return key.intValue() % numPartitions; >> } >> }, 1) . // partition by rating >> .setParallelism(5) >> .sortPartition(1, Order.DESCENDING) // locally sort by rating >> .writeAsText("..."); // write the partitioned dataset to disk >> >> This will store tuples in sorted files with names 5, 4, 3, ... that >> contain ratings in ranges (5, 4], (4, 3], and so on. Then I can read sorted >> data from disk and and N elements with the highest rating. >> Is there a way to do the same but without writing a partitioned dataset to >> a disk? >> >> I tried to use "first(10)" but it seems to give top 10 items from a random >> partition. Is there a way to get top N elements from every partition? Then I >> could locally sort top values from every partition and find top 10 global >> values. >> >> Best regards, >> Ivan. >> >> > |
You are of course right Gabor. @Ivan, you can use a heap in the MapPartitionFunction to collect the top 10 elements (note that you need to create deep-copies if object reuse is enabled [1]). Best, Fabian 2017-01-24 11:49 GMT+01:00 Gábor Gévay <[hidden email]>: Hello, |
@Fabian, I think there's a typo in your code, shouldn't it be dataset // assuming some partitioning that can be reused to avoid a shuffle .sortPartition(1, Order.DESCENDING) .mapPartition(new ReturnFirstTen())i.e. the second MapPartition has to be parallelism=1 On Tue, 24 Jan 2017 at 11:57 Fabian Hueske <[hidden email]> wrote:
|
Aljoscha, you are right. Anyway, as Gabor pointed out, this solution is very in efficient. The second mapPartition() needs to have parallelism(1), but the sortPartition() as well: dataset // assuming some partitioning that can be reused to avoid a shuffle .sortPartition(1, Order.DESCENDING) .mapPartition(new ReturnFirstTen())2017-01-24 17:52 GMT+01:00 Aljoscha Krettek <[hidden email]>:
|
Hi @Fabian, @Gabor, and @Aljoscha, Thank you for your help! It works as expected. Best regards, Ivan. On Tue, 24 Jan 2017 at 17:04 Fabian Hueske <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |