GroupReduce is interrupted on reading large CSV files

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

GroupReduce is interrupted on reading large CSV files

fsapei

Hallo everyone,

 

I have a Flink batch job, which reads four CSV files. The rows in the files=  will be read and grouped together.

 

When the four CSV Files are small enough, the job can finish successfully. = However when the input files are large, the job could not successfully exec= uted and the following exception as shown below.

 

Could somebody please help me to fix this problem?

 

 

Best regards,

Ferry

 

 

 

2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort buffer without large record handling.

2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at org.apache.flink.runtime.io.disk.RandomAccessInputView.nextSegment(RandomAccessInputView.java:79)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.readLong(AbstractPagedInputView.java:357)

        at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:287)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:406)

        at org.apache.flink.api.common.typeutils.base.BigIntSerializer.copyBigInteger(BigIntSerializer.java:141)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.copy(BigDecSerializer.java:104)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:212)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.operators.BatchTask                  - Releasing all broadcast variables.:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)

2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Spilling buffer 0.

2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort buffer without large record handling.

2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1249)

        at java.lang.Thread.join(Thread.java:1323)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.close(UnilateralSortMerger.java:480)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:367)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:362)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.internalHandleException(UnilateralSortMerger.java:842)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (1/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at org.apache.flink.runtime.util.DataInputDeserializer.readInt(DataInputDeserializer.java:179)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.readBigDecimal(BigDecSerializer.java:125)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:99)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:31)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)

        at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)

        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)

        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)

        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Emitting final buffer from reader thread: 1.

 

Reply | Threaded
Open this post in threaded view
|

Re: GroupReduce is interrupted on reading large CSV files

Kurt Young
Hi,
Can you share your input file to help us debugging the problem?


Best,
Kurt

On Thu, Mar 30, 2017 at 12:00 AM, Sapei, Ferry Syafei <[hidden email]> wrote:

Hallo everyone,

 

I have a Flink batch job, which reads four CSV files. The rows in the files=  will be read and grouped together.

 

When the four CSV Files are small enough, the job can finish successfully. = However when the input files are large, the job could not successfully exec= uted and the following exception as shown below.

 

Could somebody please help me to fix this problem?

 

 

Best regards,

Ferry

 

 

 

2017-03-29 17:39:19,396 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort buffer without large record handling.

2017-03-29 17:39:19,458 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at org.apache.flink.runtime.io.disk.RandomAccessInputView.nextSegment(RandomAccessInputView.java:79)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)

        at org.apache.flink.runtime.memory.AbstractPagedInputView.readLong(AbstractPagedInputView.java:357)

        at org.apache.flink.core.memory.HybridMemorySegment.put(HybridMemorySegment.java:287)

        at org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:406)

        at org.apache.flink.api.common.typeutils.base.BigIntSerializer.copyBigInteger(BigIntSerializer.java:141)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.copy(BigDecSerializer.java:104)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:212)

        at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,759 DEBUG org.apache.flink.runtime.operators.BatchTask                  - Releasing all broadcast variables.:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (15/16)

2017-03-29 17:39:19,660 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Spilling buffer 0.

2017-03-29 17:39:19,806 DEBUG org.apache.flink.runtime.operators.sort.NormalizedKeySorter   - Spilling sort buffer without large record handling.

2017-03-29 17:39:19,641 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.

java.lang.InterruptedException

        at java.lang.Object.wait(Native Method)

        at java.lang.Thread.join(Thread.java:1249)

        at java.lang.Thread.join(Thread.java:1323)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.close(UnilateralSortMerger.java:480)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:367)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$1.handleException(UnilateralSortMerger.java:362)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.internalHandleException(UnilateralSortMerger.java:842)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

2017-03-29 17:39:19,641 ERROR org.apache.flink.runtime.operators.BatchTask                  - Error in task code:  CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor) (1/16)

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at readCsvRows(SendungsauskunftAggregatorJob.java:140)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)

        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)

        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1094)

        at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99)

        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460)

        ... 3 more

Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799)

Caused by: java.io.EOFException

        at org.apache.flink.runtime.util.DataInputDeserializer.readInt(DataInputDeserializer.java:179)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.readBigDecimal(BigDecSerializer.java:125)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:99)

        at org.apache.flink.api.common.typeutils.base.BigDecSerializer.deserialize(BigDecSerializer.java:31)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:193)

        at org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:36)

        at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57)

        at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144)

        at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)

        at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42)

        at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1035)

        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)

2017-03-29 17:39:19,460 DEBUG org.apache.flink.runtime.operators.sort.UnilateralSortMerger  - Emitting final buffer from reader thread: 1.