Hi everyone.
I'm running the FlinkML ALS matrix factorization and I bumped into the following exception: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.client.program.Client.runBlocking(Client.java:315) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:652) at org.apache.flink.ml.common.FlinkMLTools$.persist(FlinkMLTools.scala:94) at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507) at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:433) at org.apache.flink.ml.pipeline.Estimator$class.fit(Estimator.scala:55) at org.apache.flink.ml.recommendation.ALS.fit(ALS.scala:122) at dima.tu.berlin.benchmark.flink.als.RUN$.main(RUN.scala:78) at dima.tu.berlin.benchmark.flink.als.RUN.main(RUN.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Initializing the input processing failed: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:325) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.BatchTask.initLocalStrategies(BatchTask.java:819) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:321) ... 2 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: null at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.getNextLookAhead(LocalInputChannel.java:270) at org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.onNotification(LocalInputChannel.java:238) at org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.release(PipelinedSubpartition.java:158) at org.apache.flink.runtime.io.network.partition.ResultPartition.release(ResultPartition.java:320) at org.apache.flink.runtime.io.network.partition.ResultPartitionManager.releasePartitionsProducedBy(ResultPartitionManager.java:95) at org.apache.flink.runtime.io.network.NetworkEnvironment.unregisterTask(NetworkEnvironment.java:370) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:657) at java.lang.Thread.run(Thread.java:745) I'm running with flink-1.0.3. I really can't figure out the reason behind that. My code simply calls the library as follows: val als = ALS() .setIterations(numIterations) .setNumFactors(rank) .setBlocks(degreeOfParallelism) .setSeed(42) .setTemporaryPath(tempPath) als.fit(ratings, parameters) val (users, items) = als.factorsOption match { case Some(_) => als.factorsOption.get case _ => throw new RuntimeException } users.writeAsText(outputPath, WriteMode.OVERWRITE) items.writeAsText(outputPath, WriteMode.OVERWRITE) env.execute("ALS matrix factorization") where - ratings as the input dataset contains (uid, iid, rate) rows about 8e6 users, 1e6 items and 700 rating per user average. - numIterations 10 - rank 50 - degreeOfParallelism 240 The error seems to be related to the final .persists() call. at org.apache.flink.ml.recommendation.ALS$$anon$115.fit(ALS.scala:507) I'm running with a 15 nodes cluster - 16cpus per node - with the following valuable properties: jobmanager.heap.mb = 2048 taskmanager.memory.fraction = 0.5 taskmanager.heap.mb = 28672 taskmanager.network.bufferSizeInBytes = 32768 taskmanager.network.numberOfBuffers = 98304 akka.ask.timeout = 300s Any help will be appreciated. Thank you. Andrea Spina N.Tessera: 74598 MAT: 89369 Ingegneria Informatica [LM] (D.M. 270) |
Hi,
could you provide the log outputs for your job (ideally with debug logging enabled)? Best, Stefan
|
I don't know whether my usual error is related to this one but is very similar and it happens randomly...I still have to figure out the root cause of the error:
java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:43)) -> Map (Map at main(Jsonizer.java:90))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2 at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: -2 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: -2 at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.elementData(ArrayList.java:418) at java.util.ArrayList.get(ArrayList.java:431) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:219) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:245) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:255) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) On Wed, Aug 31, 2016 at 5:57 PM, Stefan Richter <[hidden email]> wrote:
|
Sure. Here you can find the complete logs file. Still can not run through the issue. Thank you for your help. 2016-08-31 18:15 GMT+02:00 Flavio Pompermaier <[hidden email]>:
Andrea Spina N.Tessera: 74598 MAT: 89369 Ingegneria Informatica [LM] (D.M. 270) |
Hi,
unfortunately, the log does not contain the required information for this case. It seems like a sender to the SortMerger failed. The best way to find this problem is to take a look to the exceptions that are reported in the web front-end for the failing job. Could you check if you find any reported exceptions there and provide them to us? Best, Stefan
|
Hi Stefan, Thank you so much for the answer. Ok, I'll do it asap. For the sake of argument, could the issue be related to the low number of blocks? I noticed the Flink implementation, as default, set the number of blocks to the input count (which is actually a lot). So with a low cardinality and big sized blocks, maybe they don't fit somewhere... Thank you again. Andrea 2016-09-02 10:51 GMT+02:00 Stefan Richter <[hidden email]>:
Andrea Spina N.Tessera: 74598 MAT: 89369 Ingegneria Informatica [LM] (D.M. 270) |
Ok, I'm still struggling with ALS. Now I'm running with a dataset of 2M users, 250K items, 700 rates per users (1,4B ratings). 50 latent factors, 400 numOfBlocks, 400 DOP. Somehow I got the error, from the JM log I catch the previous mentioned exception: 09/06/2016 19:30:18 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))(62/400) switched to FAILED java.lang.Exception: The data preparation for task 'CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'. at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:145) at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.CoGroupDriver.prepare(CoGroupDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 5 more Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' terminated due to an exception: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'. at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Fatal error at remote task manager 'cloud-20.dima.tu-berlin.de/130.149.21.24:6121'. Then, the respective TM log java.lang.OutOfMemoryError: Java heap space 2016-09-06 19:30:17,958 ERROR org.apache.flink.runtime.taskmanager.TaskManager - Could not update input data location for task CoGrou p (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:572)). Trying to fail task. java.lang.IllegalStateException: There has been an error in the channel. at org.apache.flink.shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:173) at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.addInputChannel(PartitionRequestClientHandler.java:78) at org.apache.flink.runtime.io.network.netty.PartitionRequestClient.requestSubpartition(PartitionRequestClient.java:104) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:117) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.updateInputChannel(SingleInputGate.java:284) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(TaskManager.scala:1076) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075) at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$3$$anonfun$apply$1.apply(TaskManager.scala:1075) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) So each node has 32G memory, I'm working with taskmanager.heap.mb = 28672 And I tried with different memory fractions taskmanager.memory.fraction = (0.5, 0.6, 0.8) Hope you have enough info now. Thank you for your help. Andrea 2016-09-02 11:30 GMT+02:00 ANDREA SPINA <[hidden email]>:
Andrea Spina N.Tessera: 74598 MAT: 89369 Ingegneria Informatica [LM] (D.M. 270) |
Hi Andrea, the exception says that you don't have enough heap memory available to keep a factors block in memory. You always have to create an object on the heap when the user function is called. You can try the following out to solve the problem. 1. Further decrease the taskmanager.memory.fraction: This will cause the TaskManager to allocate less memory for managed memory and leaves more free heap memory available 2. Decrease the number of slots on the TaskManager: This will decrease the number of concurrently running user functions and thus the number of objects which have to be kept on the heap. 3. Increase the number of ALS blocks `als.setBlocks(numberBlocks)`. This will increase the number of blocks into which the factor matrices are split up. A larger number means that each individual block is smaller and thus will need fewer memory to be kept on the heap. I hope this helps you to solve the problem. Cheers, Till On Wed, Sep 7, 2016 at 11:57 AM, ANDREA SPINA <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |