Hi,
when I'm running my Flink job on a small dataset, it successfully finishes. However, when a bigger dataset is used, I get multiple exceptions: - Caused by: java.io.IOException: Cannot write record to fresh sort buffer. Record too large. - Thread 'SortMerger Reading Thread' terminated due to an exception: null A full stack trace can be found here [0]. I tried to reduce the taskmanager.memory.fraction (or so) and also the amount of parallelism, but that did not help much. Flink 1.0.3-Hadoop2.7 was used. Any tipps are appreciated. Kind regards, Sebastian [0]: http://paste.gehaxelt.in/?1f24d0da3856480d#/dR8yriXd/VQn5zTfZACS52eWiH703bJbSTZSifegwI= |
Hi,
can you please take a look at your TM logs? I would expect that you can see an java.lang.OutOfMemoryError there. If this assumption is correct, you can try to: 1. Further decrease the taskmanager.memory.fraction: This will cause the TaskManager to allocate less memory for managed memory and leaves more free heap memory available 2. Decrease the number of slots on the TaskManager: This will decrease the number of concurrently running user functions and thus the number of objects which have to be kept on the heap. 3. Increase the number of ALS blocks `als.setBlocks(numberBlocks)`. This will increase the number of blocks into which the factor matrices are split up. A larger number means that each individual block is smaller and thus will need fewer memory to be kept on the heap. Best, Stefan > Am 12.06.2017 um 15:55 schrieb Sebastian Neef <[hidden email]>: > > Hi, > > when I'm running my Flink job on a small dataset, it successfully > finishes. However, when a bigger dataset is used, I get multiple exceptions: > > - Caused by: java.io.IOException: Cannot write record to fresh sort > buffer. Record too large. > - Thread 'SortMerger Reading Thread' terminated due to an exception: null > > A full stack trace can be found here [0]. > > I tried to reduce the taskmanager.memory.fraction (or so) and also the > amount of parallelism, but that did not help much. > > Flink 1.0.3-Hadoop2.7 was used. > > Any tipps are appreciated. > > Kind regards, > Sebastian > > [0]: > http://paste.gehaxelt.in/?1f24d0da3856480d#/dR8yriXd/VQn5zTfZACS52eWiH703bJbSTZSifegwI= |
Hi Stefan,
thanks for the answer and the advise, which I've already seen in another email. Anyway, I played around with the taskmanager.numberOfTaskSlots and taskmanager.memory.fraction options. I noticed that decreasing the former and increasing the latter lead to longer execution and more processed data before the failure. The error messages and exceptions from an affected TaskManager are here [1]. Unfortunately, I cannot find a java.lang.OutOfMemoryError in here. Do you have another idea or something to try? Thanks in advance, Sebastian [1] http://paste.gehaxelt.in/?e669fabc1d4c15be#G1Ioq/ASwGUdCaK2rQ1AY3ZmCkA7LN4xVOHvM9NeI2g= |
Try to see of in the output of dmesg command there are some log about an OOM. The OS logs there such info. I had a similar experience recently... see [1] On 12 Jun 2017 21:51, "Sebastian Neef" <[hidden email]> wrote: Hi Stefan, |
Sebastian: Are you using jdk 7 or jdk 8 ? For jdk 7, there was bug w.r.t. code cache getting full which affects performance. Cheers On Mon, Jun 12, 2017 at 1:08 PM, Flavio Pompermaier <[hidden email]> wrote:
|
In reply to this post by Sebastian Neef
Hi, I think the reason is your record is too large to do a in-memory combine. You can try to disable your combiner. Best, Kurt On Mon, Jun 12, 2017 at 9:55 PM, Sebastian Neef <[hidden email]> wrote: Hi, |
Hi Kurt,
thanks for the input. What do you mean with "try to disable your combiner"? Any tips on how I can do that? I don't actively use any combine* DataSet API functions, so the calls to the SynchronousChainedCombineDriver come from Flink. Kind regards, Sebastian |
In reply to this post by Ted Yu
Hi Ted,
thanks for bringing this to my attention. I just rechecked my Java version and it is indeed version 8. Both the code and the Flink environment run that version. Cheers, Sebastian |
In reply to this post by Flavio Pompermaier
Hi Flavio,
thanks for pointing me to your old thread. I don't have administrative rights on the cluster, but from what dmesg reports, I could not find anything that looks like an OOM message. So no luck for me, I guess... Best, Sebastian |
In reply to this post by Sebastian Neef
Hi, Can you paste some code snippet to show how you use the DataSet API? Best, Kurt On Tue, Jun 13, 2017 at 4:29 PM, Sebastian Neef <[hidden email]> wrote: Hi Kurt, |
Hi,
the code is part of a bigger project, so I'll try to outline the used methods and their order: # Step 1 - Reading a Wikipedia XML Dump into a DataSet of <page>-tag delimited strings using XmlInputFormat. - A .distinct() operations removes all duplicates based on the content. - .map() is used to parse the XML data into a DataSet of Page-POJOs: -- It has an ArrayList of extracted Revision POJOs. -- A Revision has text content. -- A Revision also has a Contributor POJO. # Step 2 - Obtaining three separate DataSets with a .flatMap() -- DataSet of Page objects -- DataSet of Revision objects -- DataSet of Contributor objects - .map() on the Revisions to obtain Score-Objects. Score-Objects have two fields: A pointer to an object and a double value. -- Assign value of "1.0" for each Revision. - .groupBy() with a custom KeySelector, followed by .reduce() to get the accumulated scores. I'm not sure, if that helps much, but that is it in essence. Looking more closely on the traces, I believe that the .distinct() could be the culprit? > 2017-06-12 21:33:14,016 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN DataSource (at createInput(ExecutionEnvironment.java:552) (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> Combine(Distinct at parseDumpData(SkipDumpParser.java:43)) (28/40) > java.io.IOException: Cannot write record to fresh sort buffer. Record too large. Best, Sebastian |
In reply to this post by Kurt Young
Hi,
I removed the .distinct() and ran another test. Without filtering duplicate entries, the Job processes more data and runs much longer, but eventually fails with the following error: > java.lang.OutOfMemoryError: Requested array size exceeds VM limit Even then playing around with the aforementioned Flink settings does not resolve the problem. I guess, I need to debug this some more. Best, Sebastian |
For the 'Requested array size exceeds VM limit' error, can you pastebin the full stack trace ? Thanks On Wed, Jun 14, 2017 at 3:22 AM, Sebastian Neef <[hidden email]> wrote: Hi, |
Hi Ted,
sure. Here's the stack strace with .distinct() with the Exception in the 'SortMerger Reading Thread': [1] Here's the stack strace without .distinct() and the 'Requested array size exceeds VM limit' error: [2] If you need anything else, I can more or less reliably reproduce the issue. The best, Sebastian [1] http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9KCR48m+/V91S55kWH3dwEuyAkc= [2] http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN6xxApGk/r4YICRkLCH2HBNN9yQ= |
For #2, XmlInputFormat was involved. Is it possible to prune (unneeded) field(s) so that heap requirement is lower ? On Wed, Jun 14, 2017 at 8:47 AM, Sebastian Neef <[hidden email]> wrote: Hi Ted, |
Here are some pointers
- You would rather need MORE managed memory, not less, because the sorter uses that. - We added the "large record handler" to the sorter for exactly these use cases. Can you check in the code whether it is enabled? You'll have to go through a bit of the code to see that. It is an older Flink version, I am not quite sure any more how exactly it was there. Stephan On Wed, Jun 14, 2017 at 8:59 PM, Ted Yu <[hidden email]> wrote:
|
I think the only way is adding more managed memory. The large record handler only take effects in reduce side which used by the merge sorter. According to the exception, it is thrown during the combing phase which only uses an in-memory sorter, which doesn't have large record handle mechanism. Best, Kurt On Thu, Jun 15, 2017 at 2:33 PM, Stephan Ewen <[hidden email]> wrote:
|
Hi,
@Ted: > Is it possible to prune (unneeded) field(s) so that heap requirement is > lower ? The XmlInputFormat [0] splits the raw data into smaller chunks, which are then further processed. I don't think I can reduce the field's (Tuple2<LongWritable, Text>) sizes. The major difference to Mahout's XmlInputFormat is the compressed file support, which does not seem to exist [1]. @Stephan, @Kurt > - You would rather need MORE managed memory, not less, because the sorter > uses that. > I think the only way is adding more managed memor Ah, okay. Seems like I misunderstood it, but tested with up to 0.8 of a 46 GB RAM allocation anyway. Does that mean, that I have to scale the amount of RAM proportionally to the dataset's size in this case? I'd expected Flink to start caching and slowing down? > - We added the "large record handler" to the sorter for exactly these use > cases. Okay, so spilling to disk is theoretically possible and the crashes should not occur then? > [...] it is thrown during the combing phase which only uses an in-memory sorter, which doesn't have large record handle mechanism. Are there ways to circumvent this restriction (sorting step?) or otherwise optimize the process? > Can you check in the code whether it is enabled? You'll have to go > through a bit of the code to see that. Although, I'm not deeply involved with Flink's internal sourcecode, I'll try my best to figure that out. Thanks, Sebastian [0] http://paste.gehaxelt.in/?336f8247fa50171e#DSH0poFcVIR29X7lb98qRhUG/jrkKkUrfkUs7ECSyeE= [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-compressed-XML-data-td10985.html |
Free forum by Nabble | Edit this page |