Spargel: Memory runs out at setNewVertexValue()

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Spargel: Memory runs out at setNewVertexValue()

Attila Bernáth
Dear Developers,

We are experimenting with a pagerank-variant, in which the nodes of
the graph to work with are grouped into supernodes. The nodes send
messages to supernodes instead of nodes, thus we expect to decrease
the number of messages and accelerate the algorithm.
We implemented this algorithm with the Spargel API using the vertex
centric iterations. The VertexValue type contains all the information
that a supernode has to know: the list of the nodes grouped into this
supernode, their current pagerank, their in-neighbours etc.
We run this algorithm on a cluster containing some 40-50 machines with
an input graph containing something like 1million nodes. We always get
the error that one particular machine runs out of memory (always the
same machine) at the vertex state update. The error message is as
follows.

Error: The program execution failed: java.lang.RuntimeException:
Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1
maxPartition: 155 number of overflow segments: 0 bucketSize: 178
Overall memory: 32604160 Partition memory: 24248320 Message: null
    at hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:71)
    at hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:15)
    at org.apache.flink.spargel.java.VertexCentricIteration$VertexUpdateUdf.coGroup(VertexCentricIteration.java:430)
    at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:141)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510)
    at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137)
    at org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:109)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
    at java.lang.Thread.run(Thread.java:724)

Line 71 in SuperNodeRankUpdater is a call to the function setNewVertexValue().
Do you have some suggestions? Shall I try to put together some example?

Thank you!

Attila
Reply | Threaded
Open this post in threaded view
|

Re: Spargel: Memory runs out at setNewVertexValue()

Stephan Ewen

Hey!

Thanks for the observation. Here is what I can see:

The distribution of hash values is very skewed. One partition has one buffer as size, the other one 155. Are your objects very different in size, or is the hash function flawed? More even distribution may help here a lot.

The solution set of the delta iterations is the archillis heel of the system right now. We are actively working to make memory more adaptive and give it more if needed. Expect a big fix in a few weeks.

In the mean time, let me try and do a patch for an unofficial non-managed memory solution set. That should be able to grow into the heap and grab more memory if needed.

Stephan

Am 29.09.2014 16:11 schrieb "Attila Bernáth" <[hidden email]>:
Dear Developers,

We are experimenting with a pagerank-variant, in which the nodes of
the graph to work with are grouped into supernodes. The nodes send
messages to supernodes instead of nodes, thus we expect to decrease
the number of messages and accelerate the algorithm.
We implemented this algorithm with the Spargel API using the vertex
centric iterations. The VertexValue type contains all the information
that a supernode has to know: the list of the nodes grouped into this
supernode, their current pagerank, their in-neighbours etc.
We run this algorithm on a cluster containing some 40-50 machines with
an input graph containing something like 1million nodes. We always get
the error that one particular machine runs out of memory (always the
same machine) at the vertex state update. The error message is as
follows.

Error: The program execution failed: java.lang.RuntimeException:
Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1
maxPartition: 155 number of overflow segments: 0 bucketSize: 178
Overall memory: 32604160 Partition memory: 24248320 Message: null
    at hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:71)
    at hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:15)
    at org.apache.flink.spargel.java.VertexCentricIteration$VertexUpdateUdf.coGroup(VertexCentricIteration.java:430)
    at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:141)
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510)
    at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137)
    at org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:109)
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
    at java.lang.Thread.run(Thread.java:724)

Line 71 in SuperNodeRankUpdater is a call to the function setNewVertexValue().
Do you have some suggestions? Shall I try to put together some example?

Thank you!

Attila
Reply | Threaded
Open this post in threaded view
|

Re: Spargel: Memory runs out at setNewVertexValue()

Attila Bernáth
Dear Stephan,

Thank you for your answer, it helped understanding what was going on.

Attila


2014-09-30 10:45 GMT+02:00 Stephan Ewen <[hidden email]>:

> Hey!
>
> Thanks for the observation. Here is what I can see:
>
> The distribution of hash values is very skewed. One partition has one buffer
> as size, the other one 155. Are your objects very different in size, or is
> the hash function flawed? More even distribution may help here a lot.
>
> The solution set of the delta iterations is the archillis heel of the system
> right now. We are actively working to make memory more adaptive and give it
> more if needed. Expect a big fix in a few weeks.
>
> In the mean time, let me try and do a patch for an unofficial non-managed
> memory solution set. That should be able to grow into the heap and grab more
> memory if needed.
>
> Stephan
>
> Am 29.09.2014 16:11 schrieb "Attila Bernáth" <[hidden email]>:
>
>> Dear Developers,
>>
>> We are experimenting with a pagerank-variant, in which the nodes of
>> the graph to work with are grouped into supernodes. The nodes send
>> messages to supernodes instead of nodes, thus we expect to decrease
>> the number of messages and accelerate the algorithm.
>> We implemented this algorithm with the Spargel API using the vertex
>> centric iterations. The VertexValue type contains all the information
>> that a supernode has to know: the list of the nodes grouped into this
>> supernode, their current pagerank, their in-neighbours etc.
>> We run this algorithm on a cluster containing some 40-50 machines with
>> an input graph containing something like 1million nodes. We always get
>> the error that one particular machine runs out of memory (always the
>> same machine) at the vertex state update. The error message is as
>> follows.
>>
>> Error: The program execution failed: java.lang.RuntimeException:
>> Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1
>> maxPartition: 155 number of overflow segments: 0 bucketSize: 178
>> Overall memory: 32604160 Partition memory: 24248320 Message: null
>>     at
>> hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:71)
>>     at
>> hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:15)
>>     at
>> org.apache.flink.spargel.java.VertexCentricIteration$VertexUpdateUdf.coGroup(VertexCentricIteration.java:430)
>>     at
>> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:141)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510)
>>     at
>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137)
>>     at
>> org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:109)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
>>     at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
>>     at java.lang.Thread.run(Thread.java:724)
>>
>> Line 71 in SuperNodeRankUpdater is a call to the function
>> setNewVertexValue().
>> Do you have some suggestions? Shall I try to put together some example?
>>
>> Thank you!
>>
>> Attila
Reply | Threaded
Open this post in threaded view
|

Re: Spargel: Memory runs out at setNewVertexValue()

Stephan Ewen
Hi Attila!

We have a fix that should help you to run it for the time being: If you update to the latest master (in git right now, in the maven snapshot repositories after some sync interval), 
you will find that delta iterations and spargel the method "setSolutionSetUnManaged()". If you set it to true, the solution set memory will not be managed by the Flink runtime,
which should work around the current limitation.

In the meantime, I am still working on making the memory management adaptive, so that workaround is not needed in the future.

Here are guidelines how to use the latest snapshot version: http://flink.incubator.apache.org/downloads.html#latest

Greetings,
Stephan


On Wed, Oct 1, 2014 at 11:38 AM, Attila Bernáth <[hidden email]> wrote:
Dear Stephan,

Thank you for your answer, it helped understanding what was going on.

Attila


2014-09-30 10:45 GMT+02:00 Stephan Ewen <[hidden email]>:
> Hey!
>
> Thanks for the observation. Here is what I can see:
>
> The distribution of hash values is very skewed. One partition has one buffer
> as size, the other one 155. Are your objects very different in size, or is
> the hash function flawed? More even distribution may help here a lot.
>
> The solution set of the delta iterations is the archillis heel of the system
> right now. We are actively working to make memory more adaptive and give it
> more if needed. Expect a big fix in a few weeks.
>
> In the mean time, let me try and do a patch for an unofficial non-managed
> memory solution set. That should be able to grow into the heap and grab more
> memory if needed.
>
> Stephan
>
> Am 29.09.2014 16:11 schrieb "Attila Bernáth" <[hidden email]>:
>
>> Dear Developers,
>>
>> We are experimenting with a pagerank-variant, in which the nodes of
>> the graph to work with are grouped into supernodes. The nodes send
>> messages to supernodes instead of nodes, thus we expect to decrease
>> the number of messages and accelerate the algorithm.
>> We implemented this algorithm with the Spargel API using the vertex
>> centric iterations. The VertexValue type contains all the information
>> that a supernode has to know: the list of the nodes grouped into this
>> supernode, their current pagerank, their in-neighbours etc.
>> We run this algorithm on a cluster containing some 40-50 machines with
>> an input graph containing something like 1million nodes. We always get
>> the error that one particular machine runs out of memory (always the
>> same machine) at the vertex state update. The error message is as
>> follows.
>>
>> Error: The program execution failed: java.lang.RuntimeException:
>> Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1
>> maxPartition: 155 number of overflow segments: 0 bucketSize: 178
>> Overall memory: 32604160 Partition memory: 24248320 Message: null
>>     at
>> hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:71)
>>     at
>> hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:15)
>>     at
>> org.apache.flink.spargel.java.VertexCentricIteration$VertexUpdateUdf.coGroup(VertexCentricIteration.java:430)
>>     at
>> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:141)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510)
>>     at
>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137)
>>     at
>> org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:109)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
>>     at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
>>     at java.lang.Thread.run(Thread.java:724)
>>
>> Line 71 in SuperNodeRankUpdater is a call to the function
>> setNewVertexValue().
>> Do you have some suggestions? Shall I try to put together some example?
>>
>> Thank you!
>>
>> Attila

Reply | Threaded
Open this post in threaded view
|

Re: Spargel: Memory runs out at setNewVertexValue()

Attila Bernáth
Dear Stephan,

Sorry for not answering so far.
I will try your solution in a few days.

Attila

2014-10-03 15:44 GMT+02:00 Stephan Ewen <[hidden email]>:

> Hi Attila!
>
> We have a fix that should help you to run it for the time being: If you
> update to the latest master (in git right now, in the maven snapshot
> repositories after some sync interval),
> you will find that delta iterations and spargel the method
> "setSolutionSetUnManaged()". If you set it to true, the solution set memory
> will not be managed by the Flink runtime,
> which should work around the current limitation.
>
> In the meantime, I am still working on making the memory management
> adaptive, so that workaround is not needed in the future.
>
> Here are guidelines how to use the latest snapshot version:
> http://flink.incubator.apache.org/downloads.html#latest
>
> Greetings,
> Stephan
>
>
> On Wed, Oct 1, 2014 at 11:38 AM, Attila Bernáth <[hidden email]>
> wrote:
>>
>> Dear Stephan,
>>
>> Thank you for your answer, it helped understanding what was going on.
>>
>> Attila
>>
>>
>> 2014-09-30 10:45 GMT+02:00 Stephan Ewen <[hidden email]>:
>> > Hey!
>> >
>> > Thanks for the observation. Here is what I can see:
>> >
>> > The distribution of hash values is very skewed. One partition has one
>> > buffer
>> > as size, the other one 155. Are your objects very different in size, or
>> > is
>> > the hash function flawed? More even distribution may help here a lot.
>> >
>> > The solution set of the delta iterations is the archillis heel of the
>> > system
>> > right now. We are actively working to make memory more adaptive and give
>> > it
>> > more if needed. Expect a big fix in a few weeks.
>> >
>> > In the mean time, let me try and do a patch for an unofficial
>> > non-managed
>> > memory solution set. That should be able to grow into the heap and grab
>> > more
>> > memory if needed.
>> >
>> > Stephan
>> >
>> > Am 29.09.2014 16:11 schrieb "Attila Bernáth" <[hidden email]>:
>> >
>> >> Dear Developers,
>> >>
>> >> We are experimenting with a pagerank-variant, in which the nodes of
>> >> the graph to work with are grouped into supernodes. The nodes send
>> >> messages to supernodes instead of nodes, thus we expect to decrease
>> >> the number of messages and accelerate the algorithm.
>> >> We implemented this algorithm with the Spargel API using the vertex
>> >> centric iterations. The VertexValue type contains all the information
>> >> that a supernode has to know: the list of the nodes grouped into this
>> >> supernode, their current pagerank, their in-neighbours etc.
>> >> We run this algorithm on a cluster containing some 40-50 machines with
>> >> an input graph containing something like 1million nodes. We always get
>> >> the error that one particular machine runs out of memory (always the
>> >> same machine) at the vertex state update. The error message is as
>> >> follows.
>> >>
>> >> Error: The program execution failed: java.lang.RuntimeException:
>> >> Memory ran out. Compaction failed. numPartitions: 32 minPartition: 1
>> >> maxPartition: 155 number of overflow segments: 0 bucketSize: 178
>> >> Overall memory: 32604160 Partition memory: 24248320 Message: null
>> >>     at
>> >>
>> >> hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:71)
>> >>     at
>> >>
>> >> hu.sztaki.ilab.cumulonimbus.custom_pagerank_spargel.SuperNodeRankUpdater.updateVertex(SuperNodeRankUpdater.java:15)
>> >>     at
>> >>
>> >> org.apache.flink.spargel.java.VertexCentricIteration$VertexUpdateUdf.coGroup(VertexCentricIteration.java:430)
>> >>     at
>> >>
>> >> org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:141)
>> >>     at
>> >>
>> >> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:510)
>> >>     at
>> >>
>> >> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:137)
>> >>     at
>> >>
>> >> org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:109)
>> >>     at
>> >>
>> >> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:375)
>> >>     at
>> >>
>> >> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:265)
>> >>     at java.lang.Thread.run(Thread.java:724)
>> >>
>> >> Line 71 in SuperNodeRankUpdater is a call to the function
>> >> setNewVertexValue().
>> >> Do you have some suggestions? Shall I try to put together some example?
>> >>
>> >> Thank you!
>> >>
>> >> Attila
>
>