EOFException related to memory segments during run of Beam pipeline on Flink

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

EOFException related to memory segments during run of Beam pipeline on Flink

Reinier Kip
Hi all,

I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which causes the job to fail. You will find the stacktrace near the bottom of this post (data censored).

I would not expect such a sudden failure as the dataset apparently grows above a certain size. Doesn’t Flink spill data over to disk when memory runs out? How do I deal with this unpredictable behaviour in a production situation? I’m running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I will gladly provide more information where needed.

If this is expected behaviour, I feel it should be documented, meaning a more informative exception message, and managing user expectations in the guides. (I have not been able to find any information regarding this exception.)

Hoping that someone can enlighten me,

Reinier


08/30/2017 13:48:33     GroupReduce (GroupReduce at GroupByKey)(1/1) switched to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.EOFException
        at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:49)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:126)
        ... 19 more

Reply | Threaded
Open this post in threaded view
|

Re: EOFException related to memory segments during run of Beam pipeline on Flink

Fabian Hueske-2
Hi Reinier,

this is in fact a bug that you stumbled upon.
In general, Flink works very well with larger data sets and little memory and gracefully spills data to disk.
The problem in your case is caused by a wrapped exception.

Internally, Flink uses an EOFException to signal that the memory pool is exhausted (in your case thrown by SimpleCollectingOutputView.nextSegment()).
Unfortunately, Beam's SerializableCoder wraps this exception in a CoderException which unknown to Flink and won't be detected as EOFException by NormalizedKeySorter.write() (line 283).

I think it is debatable whether this is an issue of Beam's Flink runner or Flink itself.
In any case, it would be good if you could open an issue at Beam's JIRA to track this problem.
A quick solution for your use case should could be to use a custom coder that forwards the EOFException instead of wrapping it.

Best, Fabian

2017-08-30 14:00 GMT+02:00 Reinier Kip <[hidden email]>:
Hi all,

I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which causes the job to fail. You will find the stacktrace near the bottom of this post (data censored).

I would not expect such a sudden failure as the dataset apparently grows above a certain size. Doesn’t Flink spill data over to disk when memory runs out? How do I deal with this unpredictable behaviour in a production situation? I’m running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I will gladly provide more information where needed.

If this is expected behaviour, I feel it should be documented, meaning a more informative exception message, and managing user expectations in the guides. (I have not been able to find any information regarding this exception.)

Hoping that someone can enlighten me,

Reinier


08/30/2017 13:48:33     GroupReduce (GroupReduce at GroupByKey)(1/1) switched to FAILED
java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095)
        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)
        ... 3 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{}
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60)
        at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99)
        at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60)
        at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
        at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641)
        at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599)
        at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125)
        at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037)
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
Caused by: java.io.EOFException
        at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190)
        at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:49)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286)
        at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351)
        at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:126)
        ... 19 more