Hello :)
my Flink program is extreme slow. I implemented a set similarity join in Flink (Mass-Join). Furthermore, I implemented a local version in Java. I compared both Implementations. The Local version needs one minute to compute a 500MB Dataset. My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM). I use the Flink version 0.6. What could be the cause? I would welcome your response, Florian Hönicke massJoin.zip (37K) Download Attachment |
Hi! Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does... We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor). Greetings, Stephan On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote: Hello :) |
Hi, how did you specify the degree of parallelism DOP for your program? Via the command-line client or system-configuration or otherwise? The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task. Best, Fabian 2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
|
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm. http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf It's a R-S-Join which i modified to a self-join. Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union)) First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count. Than, it generates two types of signatures for each input set. If two sets are similar, they must share a common signature. In the next step, we find all candidate pairs (pairs which share a common signature). Some candidate pairs are filtered using the global token grouping. The remaining candidate pairs are verified to filter out all dissimilar pairs. @Fabian I specified the DOP via the command-line client as follows: /home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v The log file is attached. Best, Florian Am 24.09.2014 um 22:45 schrieb Fabian
Hueske:
flowLayoutMassJoin.png (87K) Download Attachment StackLayoutMassJoin.png (86K) Download Attachment massJoin.zip (37K) Download Attachment JobManagerLog.txt (300K) Download Attachment |
OK, the log shows that the tasks are evenly distributed to all nodes. I assume you run the program on the cluster as well on 500MB, right? Can you please also post the execution plan for the cluster execution? You get it with (See also: http://flink.incubator.apache.org/docs/0.6-incubating/cli.html): ./flink info -e jarfile.jar <parameters> Thanks, Fabian 2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
|
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan. Greetings, Florian Am 25.09.2014 um 00:41 schrieb Fabian
Hueske:
massJoin.zip (37K) Download Attachment flowLayoutMassJoin.png (87K) Download Attachment StackLayoutMassJoin.png (86K) Download Attachment executionPlan.txt (24K) Download Attachment JobManagerLog.txt (300K) Download Attachment |
Hi, the plan shows all operator DOPs as 1. Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e". BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow). 2014-09-25 10:01 GMT+02:00 Florian Hönicke <[hidden email]>:
|
Your program is doing quite a few repartitioning steps, where all data comes from a single data source. You could try two things: - triple the DataSource and Map Function that go into the two Signature FlatMaps and the two later CoGroups such that you have two source->map for each FlatMap and another one for the two later CoGroups. - check out if SemanticAnnotations can help you to prevent expensive repartitionings and sortings for the cogroups (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html). Best, Fabian 2014-09-25 10:51 GMT+02:00 Fabian Hueske <[hidden email]>:
|
Thanks a lot :)
I set some semantic annotations. Now it needs 2 minutes. Am 25.09.2014 11:32, schrieb Fabian Hueske:
|
Could you share the code?it sounds interesting to try! On Oct 2, 2014 7:31 PM, "Florian Hönicke" <[hidden email]> wrote:
|
The code is attached.
Input format: <SetID=1, token_1, token_7, token_11, token_20...token_i> <SetID=2, token_2,
token_4...token_j>
.... In the file it looks like: 1 1,7,11,20 2 2,4 We assume that all tokens (token_1...token_n) are sorted by their global token frequency. Token_1 is the least frequent token and token_n is the most frequent token. Greetings Florian -------- Original-Nachricht --------
Could you share the code?it sounds interesting to try! On Oct 2, 2014 7:31 PM, "Florian Hönicke"
<[hidden email]>
wrote:
Massflink6.zip (36K) Download Attachment |
Thanks Florian, I'll try it too in the next weeks! On Oct 2, 2014 8:00 PM, "Florian Hönicke" <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |