Hi all,
I’ve been running a Beam pipeline on Flink. Depending on the dataset size and the heap memory configuration of the jobmanager and taskmanager, I may run into an EOFException, which causes the job to fail. You will find the stacktrace near the bottom of this post (data censored). I would not expect such a sudden failure as the dataset apparently grows above a certain size. Doesn’t Flink spill data over to disk when memory runs out? How do I deal with this unpredictable behaviour in a production situation? I’m running a clean Flink 1.3.2 with heap memory of 768MiB. The dataset size is in the tens of megabytes. The same root EOFException occurred in Flink 1.2.1. I will gladly provide more information where needed. If this is expected behaviour, I feel it should be documented, meaning a more informative exception message, and managing user expectations in the guides. (I have not been able to find any information regarding this exception.) Hoping that someone can enlighten me, Reinier 08/30/2017 13:48:33 GroupReduce (GroupReduce at GroupByKey)(1/1) switched to FAILED java.lang.Exception: The data preparation for task 'GroupReduce (GroupReduce at GroupByKey)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:466) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: org.apache.beam.sdk.coders.CoderException: unable to serialize record FakeSerialisableObjectWithStringsAndDoubles{} at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:129) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:48) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:76) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:60) at org.apache.beam.sdk.transforms.join.UnionCoder.encode(UnionCoder.java:33) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:99) at org.apache.beam.sdk.coders.IterableLikeCoder.encode(IterableLikeCoder.java:60) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:135) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:36) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:652) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:641) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:599) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:80) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:125) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.write(NormalizedKeySorter.java:281) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1037) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.io.EOFException at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.nextSegment(SimpleCollectingOutputView.java:79) at org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140) at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:190) at org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper.write(DataOutputViewWrapper.java:49) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786) at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1286) at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1231) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1427) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1577) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:351) at org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:126) ... 19 more |
Hi Reinier, this is in fact a bug that you stumbled upon. In general, Flink works very well with larger data sets and little memory and gracefully spills data to disk. 2017-08-30 14:00 GMT+02:00 Reinier Kip <[hidden email]>: Hi all, |
Free forum by Nabble | Edit this page |