long runtime

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

long runtime

Rockstar Flo
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke

massJoin.zip (37K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Stephan Ewen
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke

Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Fabian Hueske
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke


Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Rockstar Flo
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke




flowLayoutMassJoin.png (87K) Download Attachment
StackLayoutMassJoin.png (86K) Download Attachment
massJoin.zip (37K) Download Attachment
JobManagerLog.txt (300K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Fabian Hueske
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke




Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Rockstar Flo
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke






massJoin.zip (37K) Download Attachment
flowLayoutMassJoin.png (87K) Download Attachment
StackLayoutMassJoin.png (86K) Download Attachment
executionPlan.txt (24K) Download Attachment
JobManagerLog.txt (300K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Fabian Hueske
Hi,

the plan shows all operator DOPs as 1.
Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e".

BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow).

2014-09-25 10:01 GMT+02:00 Florian Hönicke <[hidden email]>:
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke






Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Fabian Hueske
Your program is doing quite a few repartitioning steps, where all data comes from a single data source.
You could try two things:
- triple the DataSource and Map Function that go into the two Signature FlatMaps and the two later CoGroups such that you have two source->map for each FlatMap and another one for the two later CoGroups.
- check out if SemanticAnnotations can help you to prevent expensive repartitionings and sortings for the cogroups (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).

Best, Fabian

2014-09-25 10:51 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

the plan shows all operator DOPs as 1.
Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e".

BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow).

2014-09-25 10:01 GMT+02:00 Florian Hönicke <[hidden email]>:
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke







Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Rockstar Flo
Thanks a lot :)
I set some semantic annotations.
Now it needs 2 minutes.

Am 25.09.2014 11:32, schrieb Fabian Hueske:
Your program is doing quite a few repartitioning steps, where all data comes from a single data source.
You could try two things:
- triple the DataSource and Map Function that go into the two Signature FlatMaps and the two later CoGroups such that you have two source->map for each FlatMap and another one for the two later CoGroups.
- check out if SemanticAnnotations can help you to prevent expensive repartitionings and sortings for the cogroups (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).

Best, Fabian

2014-09-25 10:51 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

the plan shows all operator DOPs as 1.
Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e".

BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow).

2014-09-25 10:01 GMT+02:00 Florian Hönicke <[hidden email]>:
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke








Reply | Threaded
Open this post in threaded view
|

Re: long runtime

Flavio Pompermaier

Could you share the code?it sounds interesting to try!

On Oct 2, 2014 7:31 PM, "Florian Hönicke" <[hidden email]> wrote:
Thanks a lot :)
I set some semantic annotations.
Now it needs 2 minutes.

Am 25.09.2014 11:32, schrieb Fabian Hueske:
Your program is doing quite a few repartitioning steps, where all data comes from a single data source.
You could try two things:
- triple the DataSource and Map Function that go into the two Signature FlatMaps and the two later CoGroups such that you have two source->map for each FlatMap and another one for the two later CoGroups.
- check out if SemanticAnnotations can help you to prevent expensive repartitionings and sortings for the cogroups (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).

Best, Fabian

2014-09-25 10:51 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

the plan shows all operator DOPs as 1.
Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e".

BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow).

2014-09-25 10:01 GMT+02:00 Florian Hönicke <[hidden email]>:
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke








Reply | Threaded
Open this post in threaded view
|

Re: Re: long runtime

Rockstar Flo
The code is attached.
Input format: 
<SetID=1, token_1, token_7, token_11, token_20...token_i>
<SetID=2, token_2, token_4...token_j>
....
In the file it looks like:
1 1,7,11,20
2 2,4
We assume that all tokens (token_1...token_n) are sorted by their global token frequency.
Token_1 is the least frequent token and token_n is the most frequent token.

Greetings Florian

-------- Original-Nachricht --------
Betreff: Re: long runtime
Datum: Thu, 2 Oct 2014 19:42:58 +0200
Von: Flavio Pompermaier [hidden email]
Antwort an: [hidden email]
An: [hidden email]

Could you share the code?it sounds interesting to try!

On Oct 2, 2014 7:31 PM, "Florian Hönicke" <[hidden email]> wrote:
Thanks a lot :)
I set some semantic annotations.
Now it needs 2 minutes.
Edit: the triple DataSource does not have an influence.

Am 25.09.2014 11:32, schrieb Fabian Hueske:
Your program is doing quite a few repartitioning steps, where all data comes from a single data source.
You could try two things:
- triple the DataSource and Map Function that go into the two Signature FlatMaps and the two later CoGroups such that you have two source->map for each FlatMap and another one for the two later CoGroups.
- check out if SemanticAnnotations can help you to prevent expensive repartitionings and sortings for the cogroups (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).

Best, Fabian

2014-09-25 10:51 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

the plan shows all operator DOPs as 1.
Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e".

BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow).

2014-09-25 10:01 GMT+02:00 Florian Hönicke <[hidden email]>:
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke











Massflink6.zip (36K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Re: long runtime

Flavio Pompermaier

Thanks Florian, I'll try it too in the next weeks!

On Oct 2, 2014 8:00 PM, "Florian Hönicke" <[hidden email]> wrote:
The code is attached.
Input format: 
<SetID=1, token_1, token_7, token_11, token_20...token_i>
<SetID=2, token_2, token_4...token_j>
....
In the file it looks like:
1 1,7,11,20
2 2,4
We assume that all tokens (token_1...token_n) are sorted by their global token frequency.
Token_1 is the least frequent token and token_n is the most frequent token.

Greetings Florian

-------- Original-Nachricht --------
Betreff: Re: long runtime
Datum: Thu, 2 Oct 2014 19:42:58 +0200
Von: Flavio Pompermaier [hidden email]
Antwort an: [hidden email]
An: [hidden email]

Could you share the code?it sounds interesting to try!

On Oct 2, 2014 7:31 PM, "Florian Hönicke" <[hidden email]> wrote:
Thanks a lot :)
I set some semantic annotations.
Now it needs 2 minutes.
Edit: the triple DataSource does not have an influence.

Am 25.09.2014 11:32, schrieb Fabian Hueske:
Your program is doing quite a few repartitioning steps, where all data comes from a single data source.
You could try two things:
- triple the DataSource and Map Function that go into the two Signature FlatMaps and the two later CoGroups such that you have two source->map for each FlatMap and another one for the two later CoGroups.
- check out if SemanticAnnotations can help you to prevent expensive repartitionings and sortings for the cogroups (http://flink.incubator.apache.org/docs/0.6-incubating/java_api_guide.html).

Best, Fabian

2014-09-25 10:51 GMT+02:00 Fabian Hueske <[hidden email]>:
Hi,

the plan shows all operator DOPs as 1.
Did you create the plan locally or on the cluster with the correct DOP? The CLI client offers the -p parameter also for "info -e".

BTW, you could try to set the DOP to the number of cores in your cluster. (But that doesn't explain why the job is so slow).

2014-09-25 10:01 GMT+02:00 Florian Hönicke <[hidden email]>:
yes. I ran the massJoin on the cluster as well on 500MB.
I attached the execution plan.

Greetings,
Florian


Am 25.09.2014 um 00:41 schrieb Fabian Hueske:
OK, the log shows that the tasks are evenly distributed to all nodes.
I assume you run the program on the cluster as well on 500MB, right?

Can you please also post the execution plan for the cluster execution?
./flink info -e jarfile.jar <parameters>

Thanks, Fabian

2014-09-25 0:21 GMT+02:00 Florian Hönicke <[hidden email]>:
Thanks for your quick answer.
In the following, I roughly sketch the mass-join algorithm.
http://www.cs.berkeley.edu/~jnwang/papers/icde14_massjoin.pdf
It's a R-S-Join which i modified to a self-join.
Given a set of token sets. The massJoin finds all similar sets (regarding to the Jaccard Similarity(intersection/union))
First, it calculates a global token grouping, i.e., each to token is grouped in one of 30 groups. Each group has almost the same token count.
Than, it generates two types of signatures for each input set.
If two sets are similar, they must share a common signature.
In the next step, we find all candidate pairs (pairs which share a common signature).
Some candidate pairs are filtered using the global token grouping.
The remaining candidate pairs are verified to filter out all dissimilar pairs.

@Fabian
I specified the DOP via the command-line client as follows:
/home/hoenicke/flink-0.6-incubating/bin/flink run -p 11 /home/hoenicke/flink-0.6-incubating/jar/mass6.jar 0.9 \file:///home/hoenicke/flink-0.6-incubating/input/inputNummeriert.txt file:///home/hoenicke/flink-0.6-incubating/output -v

The log file is attached.

Best, Florian

Am 24.09.2014 um 22:45 schrieb Fabian Hueske:
Hi,

how did you specify the degree of parallelism DOP for your program?
Via the command-line client or system-configuration or otherwise?

The JobManager log file (./log/*jobManager*.log) contains you the DOP of each task.

Best, Fabian

2014-09-24 18:41 GMT+02:00 Stephan Ewen <[hidden email]>:
Hi!

Ad-hoc, that is not easy to say. It depends on your algorithm, how much data replication it does...

We'd need a bit of time to look into the code. It would help if you could roughly sketch the algorithm for us and give us a breakdown of how much time is spent in which operator (like a screenshot of the runtime web monitor).

Greetings,
Stephan


On Wed, Sep 24, 2014 at 6:18 PM, Florian Hönicke <[hidden email]> wrote:
Hello :)

my Flink program is extreme slow.
I implemented a set similarity join in Flink (Mass-Join).
Furthermore, I implemented a local version in Java.
I compared both Implementations.
The Local version needs one minute to compute a 500MB Dataset.
My Flink program needs 5 minutes (cluster: 11 nodes, 20 000 MB RAM).
I use the Flink version 0.6.
What could be the cause?

I would welcome your response,
Florian Hönicke