FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Andrea Spina-2
Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)
Reply | Threaded
Open this post in threaded view
|

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Stefan Richter
Hi,

could you provide the log outputs for your job (ideally with debug logging enabled)?

Best,
Stefan

Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <[hidden email]>:

Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)

Reply | Threaded
Open this post in threaded view
|

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Flavio Pompermaier
I don't know whether my usual error is related to this one but is very similar and it happens randomly...I still have to figure out the root cause of the error:

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <[hidden email]> wrote:
Hi,

could you provide the log outputs for your job (ideally with debug logging enabled)?

Best,
Stefan

Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <[hidden email]>:

Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)


Reply | Threaded
Open this post in threaded view
|

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Andrea Spina-2
Sure. Here you can find the complete logs file.
Still can not run through the issue. Thank you for your help.

2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I don't know whether my usual error is related to this one but is very similar and it happens randomly...I still have to figure out the root cause of the error:

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <[hidden email]> wrote:
Hi,

could you provide the log outputs for your job (ideally with debug logging enabled)?

Best,
Stefan

Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <[hidden email]>:

Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)





--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)
Reply | Threaded
Open this post in threaded view
|

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Stefan Richter
Hi,

unfortunately, the log does not contain the required information for this case. It seems like a sender to the SortMerger failed. The best way to find this problem is to take a look to the exceptions that are reported in the web front-end for the failing job. Could you check if you find any reported exceptions there and provide them to us?

Best,
Stefan

Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <[hidden email]>:

Sure. Here you can find the complete logs file.
Still can not run through the issue. Thank you for your help.

2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I don't know whether my usual error is related to this one but is very similar and it happens randomly...I still have to figure out the root cause of the error:

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <[hidden email]> wrote:
Hi,

could you provide the log outputs for your job (ideally with debug logging enabled)?

Best,
Stefan

Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <[hidden email]>:

Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)









--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)

Reply | Threaded
Open this post in threaded view
|

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Andrea Spina-2
Hi Stefan,
Thank you so much for the answer. Ok, I'll do it asap.
For the sake of argument, could the issue be related to the low number of blocks? I noticed the Flink implementation, as default, set the number of blocks to the input count (which is actually a lot). So with a low cardinality and big sized blocks, maybe they don't fit somewhere...
Thank you again.

Andrea

2016-09-02 10:51 GMT+02:00 Stefan Richter <[hidden email]>:
Hi,

unfortunately, the log does not contain the required information for this case. It seems like a sender to the SortMerger failed. The best way to find this problem is to take a look to the exceptions that are reported in the web front-end for the failing job. Could you check if you find any reported exceptions there and provide them to us?

Best,
Stefan

Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <[hidden email]>:

Sure. Here you can find the complete logs file.
Still can not run through the issue. Thank you for your help.

2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I don't know whether my usual error is related to this one but is very similar and it happens randomly...I still have to figure out the root cause of the error:

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <[hidden email]> wrote:
Hi,

could you provide the log outputs for your job (ideally with debug logging enabled)?

Best,
Stefan

Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <[hidden email]>:

Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)









--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)




--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)
Reply | Threaded
Open this post in threaded view
|

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Andrea Spina-2
Ok, I'm still struggling with ALS. Now I'm running with a dataset of 2M users, 250K items, 700 rates per users (1,4B ratings). 50 latent factors, 400 numOfBlocks, 400 DOP. 

 Somehow I got the error, from the JM log I catch the previous mentioned exception:

09/06/2016 19:30:18     CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))(62/400) switched to FAILED 
java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:145)
        at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
        at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
        ... 5 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.

Then, the respective TM log

java.lang.OutOfMemoryError: Java heap space
2016-09-06 19:30:17,958 ERROR org.apache.flink.runtime.taskmanager.TaskManager              - Could not update input data location for task CoGrou
p (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572)). Trying to fail  task.
java.lang.IllegalStateException: There has been an error in the channel.
        at org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:78)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:117)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.updateInputChannel(SingleInputGate.java:284)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(TaskManager.scala:1076)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So each node has 32G memory, I'm working with
taskmanager.heap.mb = 28672

And I tried with different memory fractions
taskmanager.memory.fraction = (0.5, 0.6, 0.8)

Hope you have enough info now.
Thank you for your help.

Andrea

2016-09-02 11:30 GMT+02:00 ANDREA SPINA <[hidden email]>:
Hi Stefan,
Thank you so much for the answer. Ok, I'll do it asap.
For the sake of argument, could the issue be related to the low number of blocks? I noticed the Flink implementation, as default, set the number of blocks to the input count (which is actually a lot). So with a low cardinality and big sized blocks, maybe they don't fit somewhere...
Thank you again.

Andrea

2016-09-02 10:51 GMT+02:00 Stefan Richter <[hidden email]>:
Hi,

unfortunately, the log does not contain the required information for this case. It seems like a sender to the SortMerger failed. The best way to find this problem is to take a look to the exceptions that are reported in the web front-end for the failing job. Could you check if you find any reported exceptions there and provide them to us?

Best,
Stefan

Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <[hidden email]>:

Sure. Here you can find the complete logs file.
Still can not run through the issue. Thank you for your help.

2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I don't know whether my usual error is related to this one but is very similar and it happens randomly...I still have to figure out the root cause of the error:

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <[hidden email]> wrote:
Hi,

could you provide the log outputs for your job (ideally with debug logging enabled)?

Best,
Stefan

Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <[hidden email]>:

Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)









--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)




--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)



--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)
Reply | Threaded
Open this post in threaded view
|

Re: FlinkML ALS matrix factorization: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null

Till Rohrmann
Hi Andrea,

the exception says that you don't have enough heap memory available to keep a factors block in memory. You always have to create an object on the heap when the user function is called.

You can try the following out to solve the problem.

1. Further decrease the taskmanager.memory.fraction: This will cause the TaskManager to allocate less memory for managed memory and leaves more free heap memory available
2. Decrease the number of slots on the TaskManager: This will decrease the number of concurrently running user functions and thus the number of objects which have to be kept on the heap.
3. Increase the number of ALS blocks `als.setBlocks(numberBlocks)`. This will increase the number of blocks into which the factor matrices are split up. A larger number means that each individual block is smaller and thus will need fewer memory to be kept on the heap.

I hope this helps you to solve the problem.

Cheers,
Till

On Wed, Sep 7, 2016 at 11:57 AM, ANDREA SPINA <[hidden email]> wrote:
Ok, I'm still struggling with ALS. Now I'm running with a dataset of 2M users, 250K items, 700 rates per users (1,4B ratings). 50 latent factors, 400 numOfBlocks, 400 DOP. 

 Somehow I got the error, from the JM log I catch the previous mentioned exception:

09/06/2016 19:30:18     CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))(62/400) switched to FAILED 
java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
        at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:145)
        at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
        at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
        at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
        ... 5 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.
        at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'.

Then, the respective TM log

java.lang.OutOfMemoryError: Java heap space
2016-09-06 19:30:17,958 ERROR org.apache.flink.runtime.taskmanager.TaskManager              - Could not update input data location for task CoGrou
p (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572)). Trying to fail  task.
java.lang.IllegalStateException: There has been an error in the channel.
        at org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:78)
        at org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104)
        at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:117)
        at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.updateInputChannel(SingleInputGate.java:284)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(TaskManager.scala:1076)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
        at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

So each node has 32G memory, I'm working with
taskmanager.heap.mb = 28672

And I tried with different memory fractions
taskmanager.memory.fraction = (0.5, 0.6, 0.8)

Hope you have enough info now.
Thank you for your help.

Andrea

2016-09-02 11:30 GMT+02:00 ANDREA SPINA <[hidden email]>:
Hi Stefan,
Thank you so much for the answer. Ok, I'll do it asap.
For the sake of argument, could the issue be related to the low number of blocks? I noticed the Flink implementation, as default, set the number of blocks to the input count (which is actually a lot). So with a low cardinality and big sized blocks, maybe they don't fit somewhere...
Thank you again.

Andrea

2016-09-02 10:51 GMT+02:00 Stefan Richter <[hidden email]>:
Hi,

unfortunately, the log does not contain the required information for this case. It seems like a sender to the SortMerger failed. The best way to find this problem is to take a look to the exceptions that are reported in the web front-end for the failing job. Could you check if you find any reported exceptions there and provide them to us?

Best,
Stefan

Am 01.09.2016 um 11:56 schrieb ANDREA SPINA <[hidden email]>:

Sure. Here you can find the complete logs file.
Still can not run through the issue. Thank you for your help.

2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <[hidden email]>:
I don't know whether my usual error is related to this one but is very similar and it happens randomly...I still have to figure out the root cause of the error:

java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
... 3 more
Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: -2
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at java.util.ArrayList.elementData(ArrayList.java:418)
at java.util.ArrayList.get(ArrayList.java:431)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)


On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <[hidden email]> wrote:
Hi,

could you provide the log outputs for your job (ideally with debug logging enabled)?

Best,
Stefan

Am 31.08.2016 um 14:40 schrieb ANDREA SPINA <[hidden email]>:

Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at org.apache.flink.client.program.Client.runBlocking(Client.java:315)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652)
at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433)
at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55)
at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122)
at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78)
at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321)
... 2 more
Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null
at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270)
at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238)
at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158)
at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320)
at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95)
at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657)
at java.lang.Thread.run(Thread.java:745)

I'm running with flink-1.0.3. I really can't figure out the reason behind that.

My code simply calls the library as follows:

val als = ALS()
  .setIterations(numIterations)
  .setNumFactors(rank)
  .setBlocks(degreeOfParallelism)
  .setSeed(42)
  .setTemporaryPath(tempPath)

als.fit(ratings, parameters)

val (users, items) = als.factorsOption match {
  case Some(_) => als.factorsOption.get
  case _ => throw new RuntimeException
}

users.writeAsText(outputPath, WriteMode.OVERWRITE)
items.writeAsText(outputPath, WriteMode.OVERWRITE)

env.execute("ALS matrix factorization")

where 
- ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average.
- numIterations 10
- rank 50
- degreeOfParallelism 240

The error seems to be related to the final .persists() call.
at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507)

I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties:

jobmanager.heap.mb = 2048
taskmanager.memory.fraction = 0.5
taskmanager.heap.mb = 28672
taskmanager.network.bufferSizeInBytes = 32768
taskmanager.network.numberOfBuffers = 98304
akka.ask.timeout = 300s

Any help will be appreciated. Thank you.

--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)









--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)




--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)



--
Andrea Spina
N.Tessera: 74598
MAT: 89369
Ingegneria Informatica [LM] (D.M. 270)