Good afternoon dear Community,
Since few days I'm really struggling to understand the reason behind this KryoException. Here the stack trace. 2017-06-07 10:18:52,514 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplicat ion$.main(MatrixMultiplication.scala:46)) (1/1) java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at xbenchmarks.matrices.flink.distributed.BlockMatrix.$times(B lockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: E rror obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOu tOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink\.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more 2017-06-07 10:18:52,594 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 2744/4096/4096 MB, NON HEAP: 78/80/-1 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 13, Total Capacity: 1390280, Used Memory: 1390281 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 14/15/240 MB (used/committed/max)], [Metaspace: 57/58/-1 MB (used/committed/max)], [Compressed Class Space: 6/7/1024 MB (used/committed/max)] 2017-06-07 10:18:52,766 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 17798, GC COUNT: 97], [G1 Old Generation, GC TIME (ms): 2373, GC COUNT: 1] 2017-06-07 10:18:52,841 INFO org.apache.flink.runtime.taskmanager.Task - CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46)) (1/1) (c9e95f0475275a8b62886e0f34293a0d) switched from RUNNING to FAILED. java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:46))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 Serialization trace: blockData (my.org.path.benchmarks.matrices.flink.distributed.Block) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:264) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:274) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:98) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:519) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) Caused by: java.lang.IndexOutOfBoundsException: Index: 109, Size: 5 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.get(ArrayList.java:429) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:677) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 11 more What I'm doing basically is a product between matrices: I load the matrices COO formatted; the Block class is the following (really much inspired to this https://issues.apache.org/jira/browse/FLINK-3920). import breeze.linalg.{Matrix => BreezeMatrix} import org.apache.flink.ml.math.Breeze._ import org.apache.flink.ml.math.{Matrix, SparseMatrix} class Block(val blockData: Matrix) extends MatrixLayout with Serializable { def data: Matrix = blockData def toBreeze: BreezeMatrix[Double] = blockData.asBreeze def numRows: Int = data.numRows def numCols: Int = data.numCols def *(other: Block): Block = { require(this.numCols == other.numRows) Block((blockData.asBreeze * other.toBreeze).fromBreeze) } def +(other: Block): Block = Block((blockData.asBreeze + other.toBreeze).fromBreeze) def unary_+(other: Block): Block = this + other override def equals(other: Any): Boolean = { other match { case block: Block => this.blockData.equalsMatrix(block.blockData) case _ => false } } } The block matrix is a matrix of blocks, the implicated group reduce function it's the last step of the product function. class SumGroupOfBlocks(blockMapper: BlockMapper) extends RichGroupReduceFunction[((Int, Int, Block), (Int, Int, Block)), (BlockID, Block)] { override def reduce(blocks: java.lang.Iterable[((Int, Int, Block), (Int, Int, Block))], out: Collector[(BlockID, Block)]) : Unit = { val multipliedGroup: Seq[(Int, Int, Block)] = blocks.collect { case ((i, j, left), (x, y, right)) => (i, y, left * right) }.toSeq val reducedGroup = multipliedGroup.reduce((left, right) => { val ((i, j, leftBlock), (_, _, rightBlock)) = (left, right) (i, j, leftBlock + rightBlock) }) out.collect(blockMapper.blockIdFromCoo(reducedGroup._1, reducedGroup._2), reducedGroup._3) } } The above described exception happens when I try to increase the matrices sizes over 2000x2000 (rowsXcols). It means that my code works with 1000x1000 matrices, but not with 2000x2000 matrices and above. I think it worths to mention also that the IndexOutOfBoundsException is always seeking for index 109 (on different matrices sizes) and the size of the Array is changing in a range (5-7). It looks like somehow the serialized message are truncated right before their delivery. I tried to follow several solutions, not in order what has not been worked: - employing flink-1.2.0, flink-1.3.0 - updating flink kryo library to 3.0.3 - running on parallelism 1 - explicitly register my custom classes to Kryo - varying the size of my blocks - trying to increase akka.framesize I execute this job on a three node 2vCPUS cluster, two TM, two TS per TM. 6GB task manager heap size. 16384 numOfBuffers and 16384 networkBufferSize. If I run the code on my laptop on 2000x2000 matrices, it works, likely due to jumping off remote serialization. I really hope someone could help here. It's becoming really painful... Thank you so much. Cheers, Andrea |
Hi Andrea, I did some quick issue searching, and it seems like this is a frequently asked issue on Kryo: https://github.com/EsotericSoftware/kryo/issues/428. I can’t be sure at the moment if the resolution / workaround mentioned in there makes sense, I’ll have to investigate a bit more. Also, to clarify: from the stack trace, it seems like you’re simply using whatever serializer Kryo defaults to (i.e. FieldSerializer), and not registering your own, is that correct? In the meanwhile, could you also try the following and rebuild Flink, and test to see if it works?: on https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L349, change setReferences to false. Cheers, Gordon On 7 June 2017 at 3:39:55 PM, Andrea Spina ([hidden email]) wrote:
|
@Flavio, doesn’t this look like the exception you often encountered a while back? If I remember correctly that was fixed by Kurt, right?
Best, Aljoscha
|
Yes it looks very similar to the Exception I experienced (https://issues.apache.org/jir
Once I had a very similar problems (see [2] and [3]) and I was able to avoid the problem removing the reuse of input and output within KryoSerializer (as discuss in [3])/ I hope that could help.. Best, Flavio On Thu, Jun 8, 2017 at 11:39 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Hi guys,
thank you for your interest. Yes @Flavio, I tried both 1.2.0 and 1.3.0 versions. Following Gordon suggestion I tried to put setReference to false but sadly it didn't help. What I did then was to declare a custom serializer as the following: class BlockSerializer extends Serializer[Block] with Serializable { override def read(kryo: Kryo, input: Input, block: Class[Block]): Block = { val serializer = new SparseMatrixSerializer val blockData = kryo.readObject(input, classOf[SparseMatrix], serializer) new Block(blockData) } override def write(kryo: Kryo, output: Output, block: Block): Unit = { val serializer = new SparseMatrixSerializer kryo.register(classOf[SparseMatrix], serializer) kryo.writeObject(output, block.blockData, serializer) output.close() } } class SparseMatrixSerializer extends Serializer[SparseMatrix] with Serializable { override def read(kryo: Kryo, input: Input, sparse: Class[SparseMatrix]): SparseMatrix = { val collectionIntSerializer = new CollectionSerializer() collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer) val collectionDoubleSerializer = new CollectionSerializer() collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer) val numRows = input.readInt val numCols = input.readInt val colPtrs = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray val rowIndices = kryo.readObject(input, classOf[java.util.ArrayList[Int]], collectionIntSerializer).asScala.toArray val data = kryo.readObject(input, classOf[java.util.ArrayList[Double]], collectionDoubleSerializer).asScala.toArray input.close() new SparseMatrix(numRows = numRows, numCols = numCols, colPtrs = colPtrs, rowIndices = rowIndices, data = data) } override def write(kryo: Kryo, output: Output, sparseMatrix: SparseMatrix): Unit = { val collectionIntSerializer = new CollectionSerializer() collectionIntSerializer.setElementClass(classOf[Int], new IntSerializer) val collectionDoubleSerializer = new CollectionSerializer() collectionDoubleSerializer.setElementClass(classOf[Double], new DoubleSerializer) kryo.register(classOf[java.util.ArrayList[Int]], collectionIntSerializer) kryo.register(classOf[java.util.ArrayList[Double]], collectionDoubleSerializer) output.writeInt(sparseMatrix.numRows) output.writeInt(sparseMatrix.numCols) kryo.writeObject(output, sparseMatrix.colPtrs.toList.asJava, collectionIntSerializer) kryo.writeObject(output, sparseMatrix.rowIndices.toList.asJava, collectionIntSerializer) kryo.writeObject(output, sparseMatrix.data.toList.asJava, collectionDoubleSerializer) output.close() } } What I obtained is the same previous exception but on different accessed index and size. Caused by: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at my.org.path.benchmarks.matrices.flink.distributed.BlockMatrix.$times(BlockMatrix.scala:103)) -> Map (Map at my.org.path.benchmarks.matrices.flink.MatrixMultiplication$.main(MatrixMultiplication.scala:189))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1095) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:99) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:460) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Index: 1, Size: 0 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:799) Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:653) at java.util.ArrayList.set(ArrayList.java:444) at com.esotericsoftware.kryo.util.MapReferenceResolver.setReadObject(MapReferenceResolver.java:38) at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:823) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:680) at my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:85) at my.org.path.benchmarks.matrices.flink.SerializationBlah$BlockSerializer.read(MatrixMultiplication.scala:80) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:250) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:120) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:113) at org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:31) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:145) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:144) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:42) Does it might help somehow? Thank you again, Andrea |
Hi Andrea, I’ve rallied back to this and wanted to check on the status. Have you managed to solve this in the end, or is this still a problem for you? If it’s still a problem, would you be able to provide a complete runnable example job that can reproduce the problem (ideally via a git branch I can clone and run :))? This would help me with digging a bit more into the issue. Thanks a lot! Best, Gordon On 8 June 2017 at 6:58:46 PM, Andrea Spina ([hidden email]) wrote:
|
I Gordon, sadly no news since the last message.
At the end I jumped over the issue, I was not able to solve it. I'll try provide a runnable example asap. Thank you. Andrea |
Thanks a lot Andrea! On 21 June 2017 at 8:36:32 PM, Andrea Spina ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |