Cannot write record to fresh sort buffer. Record too large.

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
Hi,

when I'm running my Flink job on a small dataset, it successfully
finishes. However, when a bigger dataset is used, I get multiple exceptions:

-  Caused by: java.io.IOException: Cannot write record to fresh sort
buffer. Record too large.
- Thread 'SortMerger Reading Thread' terminated due to an exception: null

A full stack trace can be found here [0].

I tried to reduce the taskmanager.memory.fraction (or so) and also the
amount of parallelism, but that did not help much.

Flink 1.0.3-Hadoop2.7 was used.

Any tipps are appreciated.

Kind regards,
Sebastian

[0]:
http://paste.gehaxelt.in/?1f24d0da3856480d#/dR8yriXd/VQn5zTfZACS52eWiH703bJbSTZSifegwI=
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Stefan Richter
Hi,

can you please take a look at your TM logs? I would expect that you can see an java.lang.OutOfMemoryError there.

If this assumption is correct, you can try to:

1. Further decrease the taskmanager.memory.fraction: This will cause the TaskManager to allocate less memory for managed memory and leaves more free heap memory available
2. Decrease the number of slots on the TaskManager: This will decrease the number of concurrently running user functions and thus the number of objects which have to be kept on the heap.
3. Increase the number of ALS blocks `als.setBlocks(numberBlocks)`. This will increase the number of blocks into which the factor matrices are split up. A larger number means that each individual block is smaller and thus will need fewer memory to be kept on the heap.

Best,
Stefan

> Am 12.06.2017 um 15:55 schrieb Sebastian Neef <[hidden email]>:
>
> Hi,
>
> when I'm running my Flink job on a small dataset, it successfully
> finishes. However, when a bigger dataset is used, I get multiple exceptions:
>
> -  Caused by: java.io.IOException: Cannot write record to fresh sort
> buffer. Record too large.
> - Thread 'SortMerger Reading Thread' terminated due to an exception: null
>
> A full stack trace can be found here [0].
>
> I tried to reduce the taskmanager.memory.fraction (or so) and also the
> amount of parallelism, but that did not help much.
>
> Flink 1.0.3-Hadoop2.7 was used.
>
> Any tipps are appreciated.
>
> Kind regards,
> Sebastian
>
> [0]:
> http://paste.gehaxelt.in/?1f24d0da3856480d#/dR8yriXd/VQn5zTfZACS52eWiH703bJbSTZSifegwI=

Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
Hi Stefan,

thanks for the answer and the advise, which I've already seen in another
email.

Anyway, I played around with the taskmanager.numberOfTaskSlots and
taskmanager.memory.fraction options. I noticed that decreasing the
former and increasing the latter lead to longer execution and more
processed data before the failure.

The error messages and exceptions from an affected TaskManager are here
[1]. Unfortunately, I cannot find a java.lang.OutOfMemoryError in here.

Do you have another idea or something to try?

Thanks in advance,
Sebastian


[1]
http://paste.gehaxelt.in/?e669fabc1d4c15be#G1Ioq/ASwGUdCaK2rQ1AY3ZmCkA7LN4xVOHvM9NeI2g=
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Flavio Pompermaier
Try to see of in the output of dmesg command there are some log about an OOM. The OS logs there such info. I had a similar experience recently... see [1]

On 12 Jun 2017 21:51, "Sebastian Neef" <[hidden email]> wrote:
Hi Stefan,

thanks for the answer and the advise, which I've already seen in another
email.

Anyway, I played around with the taskmanager.numberOfTaskSlots and
taskmanager.memory.fraction options. I noticed that decreasing the
former and increasing the latter lead to longer execution and more
processed data before the failure.

The error messages and exceptions from an affected TaskManager are here
[1]. Unfortunately, I cannot find a java.lang.OutOfMemoryError in here.

Do you have another idea or something to try?

Thanks in advance,
Sebastian


[1]
http://paste.gehaxelt.in/?e669fabc1d4c15be#G1Ioq/ASwGUdCaK2rQ1AY3ZmCkA7LN4xVOHvM9NeI2g=
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Ted Yu
Sebastian:
Are you using jdk 7 or jdk 8 ?

For jdk 7, there was bug w.r.t. code cache getting full which affects performance.


Cheers

On Mon, Jun 12, 2017 at 1:08 PM, Flavio Pompermaier <[hidden email]> wrote:
Try to see of in the output of dmesg command there are some log about an OOM. The OS logs there such info. I had a similar experience recently... see [1]

On 12 Jun 2017 21:51, "Sebastian Neef" <[hidden email]> wrote:
Hi Stefan,

thanks for the answer and the advise, which I've already seen in another
email.

Anyway, I played around with the taskmanager.numberOfTaskSlots and
taskmanager.memory.fraction options. I noticed that decreasing the
former and increasing the latter lead to longer execution and more
processed data before the failure.

The error messages and exceptions from an affected TaskManager are here
[1]. Unfortunately, I cannot find a java.lang.OutOfMemoryError in here.

Do you have another idea or something to try?

Thanks in advance,
Sebastian


[1]
http://paste.gehaxelt.in/?e669fabc1d4c15be#G1Ioq/ASwGUdCaK2rQ1AY3ZmCkA7LN4xVOHvM9NeI2g=

Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Kurt Young
In reply to this post by Sebastian Neef
Hi,

I think the reason is your record is too large to do a in-memory combine. You can try to disable your combiner.

Best,
Kurt

On Mon, Jun 12, 2017 at 9:55 PM, Sebastian Neef <[hidden email]> wrote:
Hi,

when I'm running my Flink job on a small dataset, it successfully
finishes. However, when a bigger dataset is used, I get multiple exceptions:

-  Caused by: java.io.IOException: Cannot write record to fresh sort
buffer. Record too large.
- Thread 'SortMerger Reading Thread' terminated due to an exception: null

A full stack trace can be found here [0].

I tried to reduce the taskmanager.memory.fraction (or so) and also the
amount of parallelism, but that did not help much.

Flink 1.0.3-Hadoop2.7 was used.

Any tipps are appreciated.

Kind regards,
Sebastian

[0]:
http://paste.gehaxelt.in/?1f24d0da3856480d#/dR8yriXd/VQn5zTfZACS52eWiH703bJbSTZSifegwI=

Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
Hi Kurt,

thanks for the input.

What do you mean with "try to disable your combiner"? Any tips on how I
can do that?

I don't actively use any combine* DataSet API functions, so the calls to
the SynchronousChainedCombineDriver come from Flink.

Kind regards,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
In reply to this post by Ted Yu
Hi Ted,

thanks for bringing this to my attention.

I just rechecked my Java version and it is indeed version 8. Both the
code and the Flink environment run that version.

Cheers,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
In reply to this post by Flavio Pompermaier
Hi Flavio,

thanks for pointing me to your old thread.

I don't have administrative rights on the cluster, but from what dmesg
reports, I could not find anything that looks like an OOM message.

So no luck for me, I guess...

Best,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Kurt Young
In reply to this post by Sebastian Neef
Hi,

Can you paste some code snippet to show how you use the DataSet API?

Best,
Kurt

On Tue, Jun 13, 2017 at 4:29 PM, Sebastian Neef <[hidden email]> wrote:
Hi Kurt,

thanks for the input.

What do you mean with "try to disable your combiner"? Any tips on how I
can do that?

I don't actively use any combine* DataSet API functions, so the calls to
the SynchronousChainedCombineDriver come from Flink.

Kind regards,
Sebastian

Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
Hi,

the code is part of a bigger project, so I'll try to outline the used
methods and their order:

# Step 1
- Reading a Wikipedia XML Dump into a DataSet of <page>-tag delimited
strings using XmlInputFormat.
- A .distinct() operations removes all duplicates based on the content.
- .map() is used to parse the XML data into a DataSet of Page-POJOs:
-- It has an ArrayList of extracted Revision POJOs.
-- A Revision has text content.
-- A Revision also has a Contributor POJO.

# Step 2
- Obtaining three separate DataSets with a .flatMap()
-- DataSet of Page objects
-- DataSet of Revision objects
-- DataSet of Contributor objects
- .map() on the Revisions to obtain Score-Objects. Score-Objects have
two fields: A pointer to an object and a double value.
-- Assign value of "1.0" for each Revision.
- .groupBy() with a custom KeySelector, followed by .reduce() to get the
accumulated scores.

I'm not sure, if that helps much, but that is it in essence. Looking
more closely on the traces, I believe that the .distinct() could be the
culprit?

> 2017-06-12 21:33:14,016 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN DataSource (at createInput(ExecutionEnvironment.java:552) (org.apache.flink.api.java.hadoop.mapred.HadoopInputFormat)) -> Combine(Distinct at parseDumpData(SkipDumpParser.java:43)) (28/40)
> java.io.IOException: Cannot write record to fresh sort buffer. Record too large.

Best,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
In reply to this post by Kurt Young
Hi,

I removed the .distinct() and ran another test.

Without filtering duplicate entries, the Job processes more data and
runs much longer, but eventually fails with the following error:

> java.lang.OutOfMemoryError: Requested array size exceeds VM limit

Even then playing around with the aforementioned Flink settings does not
resolve the problem.

I guess, I need to debug this some more.

Best,
Sebastian
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Ted Yu
For the 'Requested array size exceeds VM limit' error, can you pastebin the full stack trace ?

Thanks

On Wed, Jun 14, 2017 at 3:22 AM, Sebastian Neef <[hidden email]> wrote:
Hi,

I removed the .distinct() and ran another test.

Without filtering duplicate entries, the Job processes more data and
runs much longer, but eventually fails with the following error:

> java.lang.OutOfMemoryError: Requested array size exceeds VM limit

Even then playing around with the aforementioned Flink settings does not
resolve the problem.

I guess, I need to debug this some more.

Best,
Sebastian

Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
Hi Ted,

sure.

Here's the stack strace with .distinct() with the Exception in the
'SortMerger Reading Thread': [1]

Here's the stack strace without .distinct() and the 'Requested array
size exceeds VM limit' error: [2]

If you need anything else, I can more or less reliably reproduce the issue.

The best,
Sebastian

[1]
http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9KCR48m+/V91S55kWH3dwEuyAkc=
[2]
http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN6xxApGk/r4YICRkLCH2HBNN9yQ=
Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Ted Yu
For #2, XmlInputFormat was involved.

Is it possible to prune (unneeded) field(s) so that heap requirement is lower ?

On Wed, Jun 14, 2017 at 8:47 AM, Sebastian Neef <[hidden email]> wrote:
Hi Ted,

sure.

Here's the stack strace with .distinct() with the Exception in the
'SortMerger Reading Thread': [1]

Here's the stack strace without .distinct() and the 'Requested array
size exceeds VM limit' error: [2]

If you need anything else, I can more or less reliably reproduce the issue.

The best,
Sebastian

[1]
http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9KCR48m+/V91S55kWH3dwEuyAkc=
[2]
http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN6xxApGk/r4YICRkLCH2HBNN9yQ=

Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Stephan Ewen
Here are some pointers

  - You would rather need MORE managed memory, not less, because the sorter uses that.

  - We added the "large record handler" to the sorter for exactly these use cases. Can you check in the code whether it is enabled? You'll have to go through a bit of the code to see that. It is an older Flink version, I am not quite sure any more how exactly it was there.

Stephan


On Wed, Jun 14, 2017 at 8:59 PM, Ted Yu <[hidden email]> wrote:
For #2, XmlInputFormat was involved.

Is it possible to prune (unneeded) field(s) so that heap requirement is lower ?

On Wed, Jun 14, 2017 at 8:47 AM, Sebastian Neef <[hidden email]> wrote:
Hi Ted,

sure.

Here's the stack strace with .distinct() with the Exception in the
'SortMerger Reading Thread': [1]

Here's the stack strace without .distinct() and the 'Requested array
size exceeds VM limit' error: [2]

If you need anything else, I can more or less reliably reproduce the issue.

The best,
Sebastian

[1]
http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9KCR48m+/V91S55kWH3dwEuyAkc=
[2]
http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN6xxApGk/r4YICRkLCH2HBNN9yQ=


Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Kurt Young
I think the only way is adding more managed memory.
The large record handler only take effects in reduce side which used by the merge sorter.  According to 
the exception, it is thrown during the combing phase which only uses an in-memory sorter, which doesn't 
have large record handle mechanism.

Best,
Kurt

On Thu, Jun 15, 2017 at 2:33 PM, Stephan Ewen <[hidden email]> wrote:
Here are some pointers

  - You would rather need MORE managed memory, not less, because the sorter uses that.

  - We added the "large record handler" to the sorter for exactly these use cases. Can you check in the code whether it is enabled? You'll have to go through a bit of the code to see that. It is an older Flink version, I am not quite sure any more how exactly it was there.

Stephan


On Wed, Jun 14, 2017 at 8:59 PM, Ted Yu <[hidden email]> wrote:
For #2, XmlInputFormat was involved.

Is it possible to prune (unneeded) field(s) so that heap requirement is lower ?

On Wed, Jun 14, 2017 at 8:47 AM, Sebastian Neef <[hidden email]> wrote:
Hi Ted,

sure.

Here's the stack strace with .distinct() with the Exception in the
'SortMerger Reading Thread': [1]

Here's the stack strace without .distinct() and the 'Requested array
size exceeds VM limit' error: [2]

If you need anything else, I can more or less reliably reproduce the issue.

The best,
Sebastian

[1]
http://paste.gehaxelt.in/?2757c33ed3a3733b#jHQPPQNKKrE2wq4o9KCR48m+/V91S55kWH3dwEuyAkc=
[2]
http://paste.gehaxelt.in/?b106990deccecf1a#y22HgySqCYEOaP2wN6xxApGk/r4YICRkLCH2HBNN9yQ=



Reply | Threaded
Open this post in threaded view
|

Re: Cannot write record to fresh sort buffer. Record too large.

Sebastian Neef
Hi,

@Ted:

> Is it possible to prune (unneeded) field(s) so that heap requirement is
> lower ?

The XmlInputFormat [0] splits the raw data into smaller chunks, which
are then further processed. I don't think I can reduce the field's
(Tuple2<LongWritable, Text>) sizes. The major difference to Mahout's
XmlInputFormat is the compressed file support, which does not seem to
exist [1].

@Stephan, @Kurt

>   - You would rather need MORE managed memory, not less, because the sorter
> uses that.

> I think the only way is adding more managed memor
Ah, okay. Seems like I misunderstood it, but tested with up to 0.8 of a
46 GB RAM allocation anyway. Does that mean, that I have to scale the
amount of RAM proportionally to the dataset's size in this case? I'd
expected Flink to start caching and slowing down?

>  - We added the "large record handler" to the sorter for exactly these use
> cases.

Okay, so spilling to disk is theoretically possible and the crashes
should not occur then?

>  [...] it is thrown during the combing phase which only uses an in-memory sorter, which doesn't have large record handle mechanism.

Are there ways to circumvent this restriction (sorting step?) or
otherwise optimize the process?

> Can you check in the code whether it is enabled? You'll have to go
> through a bit of the code to see that.

Although, I'm not deeply involved with Flink's internal sourcecode, I'll
try my best to figure that out.

Thanks,
Sebastian

[0]
http://paste.gehaxelt.in/?336f8247fa50171e#DSH0poFcVIR29X7lb98qRhUG/jrkKkUrfkUs7ECSyeE=
[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-compressed-XML-data-td10985.html