Hallo everyone, I have a Flink batch job, which reads four CSV files. The rows in the files= will be read and grouped together. When the four CSV Files are small enough, the job can finish successfully. = However when the input files are large, the job could not successfully exec= uted and the following exception as shown below. Could somebody please help me to fix this problem? Best regards, Ferry 2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter - Spilling sort buffer without large record handling. 2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16) java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger
spilling thread' terminated due to an exception: null at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.io.EOFException at org.apache.flink.runtime.io.disk.RandomAccessInputView.nextSegment(RandomAccessInputView.java:79) at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159) at org.apache.flink.runtime.memory.AbstractPagedInputView.readLong(AbstractPagedInputView.java:357) at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:287) at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:406) at org.apache.flink.api.common.typeutils.base.BigIntSerializer.copyBigInteger(BigIntSerializer.java:141) at org.apache.flink.api.common.typeutils.base.BigDecSerializer.copy(BigDecSerializer.java:104) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:212) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) 2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.operators.BatchTask - Releasing all broadcast variables.: CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)
(15/16) 2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger - Spilling buffer 0. 2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter - Spilling sort buffer without large record handling. 2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger - Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working. java.lang.InterruptedException at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1249) at java.lang.Thread.join(Thread.java:1323) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.close(UnilateralSortMerger.java:480) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:367) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:362) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.internalHandleException(UnilateralSortMerger.java:842) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) 2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (1/16) java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger
Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.io.EOFException at org.apache.flink.runtime.util.DataInputDeserializer.readInt(DataInputDeserializer.java:179) at org.apache.flink.api.common.typeutils.base.BigDecSerializer.readBigDecimal(BigDecSerializer.java:125) at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:99) at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:31) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) 2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger - Emitting final buffer from reader thread: 1. |
Hi, Can you share your input file to help us debugging the problem? Best, Kurt On Thu, Mar 30, 2017 at 12:00 AM, Sapei, Ferry Syafei <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |