Hi, While running PageRank on a synthetic graph I run into this problem: Any advice on how should I proceed to overcome this memory issue? IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Thanks! Best, Ovidiu
|
Hi
I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: > A possible workaround is to use the option "setSolutionSetUnmanaged(true)" > on the iteration. That will eliminate the fragmentation issue, at least. Unfortunately, you cannot set this when using graph.run(new PageRank(...)) I created a Gist which shows you how to set this using PageRank https://gist.github.com/s1ck/801a8ef97ce374b358df Please let us know if it worked out for you. Cheers, Martin [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: > Hi, > > While running PageRank on a synthetic graph I run into this problem: > Any advice on how should I proceed to overcome this memory issue? > > IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ > java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 > at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) > at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) > at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) > at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) > at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > Thanks! > > Best, > Ovidiu > |
Thank you for this alternative.
I don’t understand how the workaround will fix this on systems with limited memory and maybe larger graph.
Running Connected Components on the same graph gives the same problem. IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: Index: 32, Size: 31 at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Best, Ovidiu
|
Hi Ovidiu, this option won't fix the problem if your system doesn't have enough memory :) It only defines whether the solution set is kept in managed memory or not. For more iteration configuration options, take a look at the Gelly documentation [1]. -Vasia. On 14 March 2016 at 17:55, Ovidiu-Cristian MARCU <[hidden email]> wrote:
|
In reply to this post by Ovidiu-Cristian MARCU
Hi,
I understand the confusion. So far, I did not run into the problem, but I think this needs to be adressed as all our graph processing abstractions are implemented on top of the delta iteration. According to the previous mailing list discussion, the problem is with the solution set and its missing ability to spill. If this is the still the case, we should open an issue for that. Any further opinions on that? Cheers, Martin On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote: > Thank you for this alternative. > I don’t understand how the workaround will fix this on systems with limited memory and maybe larger graph. > > Running Connected Components on the same graph gives the same problem. > > IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED > java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: Index: 32, Size: 31 > at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) > at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) > at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) > at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) > at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) > at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > > Best, > Ovidiu > >> On 14 Mar 2016, at 17:36, Martin Junghanns <[hidden email]> wrote: >> >> Hi >> >> I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: >> >>> A possible workaround is to use the option "setSolutionSetUnmanaged(true)" >>> on the iteration. That will eliminate the fragmentation issue, at least. >> >> Unfortunately, you cannot set this when using graph.run(new PageRank(...)) >> >> I created a Gist which shows you how to set this using PageRank >> >> https://gist.github.com/s1ck/801a8ef97ce374b358df >> >> Please let us know if it worked out for you. >> >> Cheers, >> Martin >> >> [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E >> >> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: >>> Hi, >>> >>> While running PageRank on a synthetic graph I run into this problem: >>> Any advice on how should I proceed to overcome this memory issue? >>> >>> IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ >>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 >>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Thanks! >>> >>> Best, >>> Ovidiu >>> > > |
This problem is surprising as I was able to run PR and CC on a larger graph (2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran out of memory; regarding configuration (memory and parallelism, other internals) I was using the same. There is some limitation somewhere I will try to understand more what is happening. Best, Ovidiu > On 14 Mar 2016, at 18:06, Martin Junghanns <[hidden email]> wrote: > > Hi, > > I understand the confusion. So far, I did not run into the problem, but I think this needs to be adressed as all our graph processing abstractions are implemented on top of the delta iteration. > > According to the previous mailing list discussion, the problem is with the solution set and its missing ability to spill. > > If this is the still the case, we should open an issue for that. Any further opinions on that? > > Cheers, > Martin > > > On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote: >> Thank you for this alternative. >> I don’t understand how the workaround will fix this on systems with limited memory and maybe larger graph. >> >> Running Connected Components on the same graph gives the same problem. >> >> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED >> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: Index: 32, Size: 31 >> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >> at java.lang.Thread.run(Thread.java:745) >> >> Best, >> Ovidiu >> >>> On 14 Mar 2016, at 17:36, Martin Junghanns <[hidden email]> wrote: >>> >>> Hi >>> >>> I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: >>> >>>> A possible workaround is to use the option "setSolutionSetUnmanaged(true)" >>>> on the iteration. That will eliminate the fragmentation issue, at least. >>> >>> Unfortunately, you cannot set this when using graph.run(new PageRank(...)) >>> >>> I created a Gist which shows you how to set this using PageRank >>> >>> https://gist.github.com/s1ck/801a8ef97ce374b358df >>> >>> Please let us know if it worked out for you. >>> >>> Cheers, >>> Martin >>> >>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E >>> >>> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: >>>> Hi, >>>> >>>> While running PageRank on a synthetic graph I run into this problem: >>>> Any advice on how should I proceed to overcome this memory issue? >>>> >>>> IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ >>>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 >>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Thanks! >>>> >>>> Best, >>>> Ovidiu >>>> >> >> |
Probably the limitation is that the number of keys is different in the
real and the synthetic data set respectively. Can you confirm this? The solution set for delta iterations is currently implemented as an in-memory hash table that works on managed memory segments, but is not spillable. – Ufuk On Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU <[hidden email]> wrote: > > This problem is surprising as I was able to run PR and CC on a larger graph (2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran out of memory; regarding configuration (memory and parallelism, other internals) I was using the same. > There is some limitation somewhere I will try to understand more what is happening. > > Best, > Ovidiu > >> On 14 Mar 2016, at 18:06, Martin Junghanns <[hidden email]> wrote: >> >> Hi, >> >> I understand the confusion. So far, I did not run into the problem, but I think this needs to be adressed as all our graph processing abstractions are implemented on top of the delta iteration. >> >> According to the previous mailing list discussion, the problem is with the solution set and its missing ability to spill. >> >> If this is the still the case, we should open an issue for that. Any further opinions on that? >> >> Cheers, >> Martin >> >> >> On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote: >>> Thank you for this alternative. >>> I don’t understand how the workaround will fix this on systems with limited memory and maybe larger graph. >>> >>> Running Connected Components on the same graph gives the same problem. >>> >>> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED >>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: Index: 32, Size: 31 >>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Best, >>> Ovidiu >>> >>>> On 14 Mar 2016, at 17:36, Martin Junghanns <[hidden email]> wrote: >>>> >>>> Hi >>>> >>>> I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: >>>> >>>>> A possible workaround is to use the option "setSolutionSetUnmanaged(true)" >>>>> on the iteration. That will eliminate the fragmentation issue, at least. >>>> >>>> Unfortunately, you cannot set this when using graph.run(new PageRank(...)) >>>> >>>> I created a Gist which shows you how to set this using PageRank >>>> >>>> https://gist.github.com/s1ck/801a8ef97ce374b358df >>>> >>>> Please let us know if it worked out for you. >>>> >>>> Cheers, >>>> Martin >>>> >>>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E >>>> >>>> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: >>>>> Hi, >>>>> >>>>> While running PageRank on a synthetic graph I run into this problem: >>>>> Any advice on how should I proceed to overcome this memory issue? >>>>> >>>>> IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ >>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 >>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Thanks! >>>>> >>>>> Best, >>>>> Ovidiu >>>>> >>> >>> > |
Yes, largely different. I was expecting for the solution set to be spillable.
This is somehow very hard limitation, the layout of the data makes the difference. By contract, I am able to run successfully CC on the synthetic data but RDDs are persisted in memory or on disk. Best, Ovidiu > On 14 Mar 2016, at 18:48, Ufuk Celebi <[hidden email]> wrote: > > Probably the limitation is that the number of keys is different in the > real and the synthetic data set respectively. Can you confirm this? > > The solution set for delta iterations is currently implemented as an > in-memory hash table that works on managed memory segments, but is not > spillable. > > – Ufuk > > On Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU > <[hidden email]> wrote: >> >> This problem is surprising as I was able to run PR and CC on a larger graph (2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran out of memory; regarding configuration (memory and parallelism, other internals) I was using the same. >> There is some limitation somewhere I will try to understand more what is happening. >> >> Best, >> Ovidiu >> >>> On 14 Mar 2016, at 18:06, Martin Junghanns <[hidden email]> wrote: >>> >>> Hi, >>> >>> I understand the confusion. So far, I did not run into the problem, but I think this needs to be adressed as all our graph processing abstractions are implemented on top of the delta iteration. >>> >>> According to the previous mailing list discussion, the problem is with the solution set and its missing ability to spill. >>> >>> If this is the still the case, we should open an issue for that. Any further opinions on that? >>> >>> Cheers, >>> Martin >>> >>> >>> On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote: >>>> Thank you for this alternative. >>>> I don’t understand how the workaround will fix this on systems with limited memory and maybe larger graph. >>>> >>>> Running Connected Components on the same graph gives the same problem. >>>> >>>> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED >>>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: Index: 32, Size: 31 >>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Best, >>>> Ovidiu >>>> >>>>> On 14 Mar 2016, at 17:36, Martin Junghanns <[hidden email]> wrote: >>>>> >>>>> Hi >>>>> >>>>> I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: >>>>> >>>>>> A possible workaround is to use the option "setSolutionSetUnmanaged(true)" >>>>>> on the iteration. That will eliminate the fragmentation issue, at least. >>>>> >>>>> Unfortunately, you cannot set this when using graph.run(new PageRank(...)) >>>>> >>>>> I created a Gist which shows you how to set this using PageRank >>>>> >>>>> https://gist.github.com/s1ck/801a8ef97ce374b358df >>>>> >>>>> Please let us know if it worked out for you. >>>>> >>>>> Cheers, >>>>> Martin >>>>> >>>>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E >>>>> >>>>> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: >>>>>> Hi, >>>>>> >>>>>> While running PageRank on a synthetic graph I run into this problem: >>>>>> Any advice on how should I proceed to overcome this memory issue? >>>>>> >>>>>> IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ >>>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 >>>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>> >>>>>> Thanks! >>>>>> >>>>>> Best, >>>>>> Ovidiu >>>>>> >>>> >>>> >> |
Correction: successfully CC I am running is on top of your friend, Spark :)
Best, Ovidiu > On 14 Mar 2016, at 20:38, Ovidiu-Cristian MARCU <[hidden email]> wrote: > > Yes, largely different. I was expecting for the solution set to be spillable. > This is somehow very hard limitation, the layout of the data makes the difference. > > By contract, I am able to run successfully CC on the synthetic data but RDDs are persisted in memory or on disk. > > Best, > Ovidiu > >> On 14 Mar 2016, at 18:48, Ufuk Celebi <[hidden email]> wrote: >> >> Probably the limitation is that the number of keys is different in the >> real and the synthetic data set respectively. Can you confirm this? >> >> The solution set for delta iterations is currently implemented as an >> in-memory hash table that works on managed memory segments, but is not >> spillable. >> >> – Ufuk >> >> On Mon, Mar 14, 2016 at 6:30 PM, Ovidiu-Cristian MARCU >> <[hidden email]> wrote: >>> >>> This problem is surprising as I was able to run PR and CC on a larger graph (2bil edges) but with this synthetic graph (1bil edges groups of 10) I ran out of memory; regarding configuration (memory and parallelism, other internals) I was using the same. >>> There is some limitation somewhere I will try to understand more what is happening. >>> >>> Best, >>> Ovidiu >>> >>>> On 14 Mar 2016, at 18:06, Martin Junghanns <[hidden email]> wrote: >>>> >>>> Hi, >>>> >>>> I understand the confusion. So far, I did not run into the problem, but I think this needs to be adressed as all our graph processing abstractions are implemented on top of the delta iteration. >>>> >>>> According to the previous mailing list discussion, the problem is with the solution set and its missing ability to spill. >>>> >>>> If this is the still the case, we should open an issue for that. Any further opinions on that? >>>> >>>> Cheers, >>>> Martin >>>> >>>> >>>> On 14.03.2016 17:55, Ovidiu-Cristian MARCU wrote: >>>>> Thank you for this alternative. >>>>> I don’t understand how the workaround will fix this on systems with limited memory and maybe larger graph. >>>>> >>>>> Running Connected Components on the same graph gives the same problem. >>>>> >>>>> IterationHead(Unnamed Delta Iteration)(82/88) switched to FAILED >>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 31 maxPartition: 32 number of overflow segments: 417 bucketSize: 827 Overall memory: 149159936 Partition memory: 65601536 Message: Index: 32, Size: 31 >>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>> at java.lang.Thread.run(Thread.java:745) >>>>> >>>>> Best, >>>>> Ovidiu >>>>> >>>>>> On 14 Mar 2016, at 17:36, Martin Junghanns <[hidden email]> wrote: >>>>>> >>>>>> Hi >>>>>> >>>>>> I think this is the same issue we had before on the list [1]. Stephan recommended the following workaround: >>>>>> >>>>>>> A possible workaround is to use the option "setSolutionSetUnmanaged(true)" >>>>>>> on the iteration. That will eliminate the fragmentation issue, at least. >>>>>> >>>>>> Unfortunately, you cannot set this when using graph.run(new PageRank(...)) >>>>>> >>>>>> I created a Gist which shows you how to set this using PageRank >>>>>> >>>>>> https://gist.github.com/s1ck/801a8ef97ce374b358df >>>>>> >>>>>> Please let us know if it worked out for you. >>>>>> >>>>>> Cheers, >>>>>> Martin >>>>>> >>>>>> [1] http://mail-archives.apache.org/mod_mbox/flink-user/201508.mbox/%3CCAELUF_ByPAB%2BPXWLemPzRH%3D-awATeSz4sGz4v9TmnvFku3%3Dx3A%40mail.gmail.com%3E >>>>>> >>>>>> On 14.03.2016 16:55, Ovidiu-Cristian MARCU wrote: >>>>>>> Hi, >>>>>>> >>>>>>> While running PageRank on a synthetic graph I run into this problem: >>>>>>> Any advice on how should I proceed to overcome this memory issue? >>>>>>> >>>>>>> IterationHead(Vertex-centric iteration (org.apache.flink.graph.library.PageRank$VertexRankUpdater@7712cae0 | org.apache.flink.graph.library.PageRank$RankMesseng$ >>>>>>> java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 32 minPartition: 24 maxPartition: 25 number of overflow segments: 328 bucketSize: 638 Overall memory: 115539968 Partition memory: 50659328 Message: Index: 25, Size: 24 >>>>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:469) >>>>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:414) >>>>>>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:325) >>>>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.readInitialSolutionSet(IterationHeadTask.java:212) >>>>>>> at org.apache.flink.runtime.iterative.task.IterationHeadTask.run(IterationHeadTask.java:273) >>>>>>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) >>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Best, >>>>>>> Ovidiu >>>>>>> >>>>> >>>>> >>> > |
In reply to this post by Ufuk Celebi
Hi,
Regarding the solution set going out of memory, I would like an issue to be filled against it. Looking into code for CompactingHashTable I see The hash table is internally divided into two parts: The hash index, and the partition buffers that store the actual records. When records are inserted or updated, the hash table appends the records to its corresponding partition, and inserts or updates the entry in the hash index. In the case that the hash table runs out of memory, it compacts a partition by walking through the hash index and copying all reachable elements into a fresh partition. After that, it releases the memory of the partition to compact. It is not clear the expected behaviour when the hash table runs out of memory. If by contrast Spark is working on RDDs and they can be cached in memory or spilled to disk, something similar could be done for all the components currently built in memory and not being spilled to disk to avoid OutOfMemory. What do you think? Best, Ovidiu
|
Hi Ovidiu, putting the CompactingHashTable aside, all data structures and algorithms that use managed memory can spill to disk if data exceeds memory capacity. It was a conscious choice to not let the CompactingHashTable spill. Once the solution set hash table is spilled, (parts of) the hash table needs to be read and written in each iteration. This would have a very significant impact on the performance. So far the guideline was to add more machines if you run out of memory in a delta iteration to keep computation in-memory. 2016-03-16 8:14 GMT+01:00 Ovidiu-Cristian MARCU <[hidden email]>:
|
Free forum by Nabble | Edit this page |