Hello,
I have a Flink 1.1.3 batch application that makes a simple aggregation but freezes when collect() is called when the app is deployed on a ha-enabled yarn cluster (it works on a local cluster). Just before it hangs, I have the following deserialization error in the logs : (...) 2016-11-29 15:10:10,422 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) switched from DEPLOYING to RUNNING 2016-11-29 15:10:13,175 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover) (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to FINISHED 2016-11-29 15:10:17,816 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover) (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to FINISHED 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - Attempting to recover all jobs. 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error: Failed to recover jobs. java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at java.util.HashMap.readObject(HashMap.java:1184) at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58) at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:530) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Do you have an idea of what can be wrong? I have no problems with other batch applications, just with this one. Why is it trying to recover the jobs In the first place ? Thanks, Arnaud ________________________________ L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. The integrity of this message cannot be guaranteed on the Internet. The company that sent this message cannot therefore be held liable for its content nor attachments. Any unauthorized use or dissemination is prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender. |
Hey Arnaud,
could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the configured ZK root contains data. A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure. – Ufuk The complete JM logs will be helpful to further check what's happening there. On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > Hello, > > I have a Flink 1.1.3 batch application that makes a simple aggregation but freezes when > collect() is called when the app is deployed on a ha-enabled yarn cluster (it works on > a local cluster). > Just before it hangs, I have the following deserialization error in the logs : > > (...) > 2016-11-29 15:10:10,422 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) switched from > DEPLOYING to RUNNING > 2016-11-29 15:10:13,175 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover) > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to FINISHED > 2016-11-29 15:10:17,816 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map (Key Remover) > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to FINISHED > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - Attempting > to recover all jobs. > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error: > Failed to recover jobs. > java.io.StreamCorruptedException: invalid type code: 00 > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:58) > at org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.getState(FileSerializableStateHandle.java:35) > at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManager.scala:530) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scala:526) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Do you have an idea of what can be wrong? I have no problems with other batch applications, > just with this one. Why is it trying to recover the jobs In the first place ? > Thanks, > Arnaud > > ________________________________ > > L'intégrité de ce message n'étant pas assurée sur internet, la société expéditrice > ne peut être tenue responsable de son contenu ni de ses pièces jointes. Toute utilisation > ou diffusion non autorisée est interdite. Si vous n'êtes pas destinataire de ce message, > merci de le détruire et d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The company that > sent this message cannot therefore be held liable for its content nor attachments. Any > unauthorized use or dissemination is prohibited. If you are not the intended recipient > of this message, then please delete it and notify the sender. > |
Hi,
Don't think so. I always delete the ZK path before launching the batch (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears only before the collect() phase, not at the beginning. Full log is availlable here : https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r Thanks, Arnaud -----Message d'origine----- De : Ufuk Celebi [mailto:[hidden email]] Envoyé : mardi 29 novembre 2016 18:43 À : LINZ, Arnaud <[hidden email]>; [hidden email] Objet : Re: Collect() freeze on yarn cluster on strange recover/deserialization error Hey Arnaud, could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the configured ZK root contains data. A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure. – Ufuk The complete JM logs will be helpful to further check what's happening there. On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > Hello, > > I have a Flink 1.1.3 batch application that makes a simple aggregation > but freezes when > collect() is called when the app is deployed on a ha-enabled yarn > cluster (it works on a local cluster). > Just before it hangs, I have the following deserialization error in the logs : > > (...) > 2016-11-29 15:10:10,422 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > switched from DEPLOYING to RUNNING > 2016-11-29 15:10:13,175 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > (Key Remover) > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > FINISHED > 2016-11-29 15:10:17,816 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > (Key Remover) > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > FINISHED > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > Attempting to recover all jobs. > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error: > Failed to recover jobs. > java.io.StreamCorruptedException: invalid type code: 00 at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017 > ) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > getState(FileSerializableStateHandle.java:58) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > getState(FileSerializableStateHandle.java:35) > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.re > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManag > er.scala:530) at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > a:526) at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > a:526) at > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F > uture.scala:24) at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal > a:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr > actDispatcher.scala:401) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > java:1339) at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197 > 9) at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea > d.java:107) > > > Do you have an idea of what can be wrong? I have no problems with > other batch applications, just with this one. Why is it trying to recover the jobs In the first place ? > Thanks, > Arnaud > > ________________________________ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses > pièces jointes. Toute utilisation ou diffusion non autorisée est > interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. > The company that sent this message cannot therefore be held liable for > its content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender. > |
In reply to this post by Ufuk Celebi
Hi,
Any news? It's maybe caused by an oversized akka payload (many akka.remote.OversizedPayloadException: Discarding oversized payload sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage was 69074412 bytes in the log) How do I set akka's maximum-payload-bytes in my flink cluster? https://issues.apache.org/jira/browse/FLINK-2373 is not clear about that. I do not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment(). Do I have to change the way I'm doing things ? How ? Thanks, Arnaud -----Message d'origine----- De : LINZ, Arnaud Envoyé : mercredi 30 novembre 2016 08:59 À : [hidden email] Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error Hi, Don't think so. I always delete the ZK path before launching the batch (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears only before the collect() phase, not at the beginning. Full log is availlable here : https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r Thanks, Arnaud -----Message d'origine----- De : Ufuk Celebi [mailto:[hidden email]] Envoyé : mardi 29 novembre 2016 18:43 À : LINZ, Arnaud <[hidden email]>; [hidden email] Objet : Re: Collect() freeze on yarn cluster on strange recover/deserialization error Hey Arnaud, could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the configured ZK root contains data. A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure. – Ufuk The complete JM logs will be helpful to further check what's happening there. On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > Hello, > > I have a Flink 1.1.3 batch application that makes a simple aggregation > but freezes when > collect() is called when the app is deployed on a ha-enabled yarn > cluster (it works on a local cluster). > Just before it hangs, I have the following deserialization error in the logs : > > (...) > 2016-11-29 15:10:10,422 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > switched from DEPLOYING to RUNNING > 2016-11-29 15:10:13,175 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > (Key Remover) > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > FINISHED > 2016-11-29 15:10:17,816 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > (Key Remover) > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > FINISHED > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > Attempting to recover all jobs. > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error: > Failed to recover jobs. > java.io.StreamCorruptedException: invalid type code: 00 at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at java.util.HashMap.readObject(HashMap.java:1184) > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017 > ) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > 0) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > 98) at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > getState(FileSerializableStateHandle.java:58) > at > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > getState(FileSerializableStateHandle.java:35) > at > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.re > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManag > er.scala:530) at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > a:526) at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > a:526) at > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F > uture.scala:24) at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal > a:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr > actDispatcher.scala:401) at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > java:1339) at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197 > 9) at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea > d.java:107) > > > Do you have an idea of what can be wrong? I have no problems with > other batch applications, just with this one. Why is it trying to recover the jobs In the first place ? > Thanks, > Arnaud > > ________________________________ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses > pièces jointes. Toute utilisation ou diffusion non autorisée est > interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. > The company that sent this message cannot therefore be held liable for > its content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender. > |
I also don't get why the job is recovering, but the oversized message is very likely the cause for the freezing collect, because the data set is gather via Akka.
You can configure the frame size via "akka.framesize", which defaults to 10485760b (10 MB). Is the collected result larger than that? Could you try to increase the frame size and report back? – Ufuk On 7 December 2016 at 17:57:22, LINZ, Arnaud ([hidden email]) wrote: > Hi, > > Any news? It's maybe caused by an oversized akka payload > (many akka.remote.OversizedPayloadException: Discarding oversized payload sent > to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: > max allowed size 10485760 bytes, actual size of encoded class org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage > was 69074412 bytes in the log) > > How do I set akka's maximum-payload-bytes in my flink cluster? > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about that. I do > not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment(). > > Do I have to change the way I'm doing things ? How ? > > Thanks, > Arnaud > > -----Message d'origine----- > De : LINZ, Arnaud > Envoyé : mercredi 30 novembre 2016 08:59 > À : [hidden email] > Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error > > Hi, > > Don't think so. I always delete the ZK path before launching the batch (with /usr/bin/zookeeper-client > -server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and > the "recovery" log line appears only before the collect() phase, not at the beginning. > > Full log is availlable here : https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r > > Thanks, > Arnaud > > > -----Message d'origine----- > De : Ufuk Celebi [mailto:[hidden email]] Envoyé : mardi 29 novembre 2016 18:43 À : LINZ, > Arnaud ; [hidden email] Objet : Re: Collect() > freeze on yarn cluster on strange recover/deserialization error > > Hey Arnaud, > > could this be a left over job that is recovered from ZooKeeper? Recovery only happens > if the configured ZK root contains data. > > A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally > w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure. > > – Ufuk > > The complete JM logs will be helpful to further check what's happening there. > > > On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > > Hello, > > > > I have a Flink 1.1.3 batch application that makes a simple aggregation > > but freezes when > > collect() is called when the app is deployed on a ha-enabled yarn > > cluster (it works on a local cluster). > > Just before it hangs, I have the following deserialization error in the logs : > > > > (...) > > 2016-11-29 15:10:10,422 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > > switched from DEPLOYING to RUNNING > > 2016-11-29 15:10:13,175 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > (Key Remover) > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > > FINISHED > > 2016-11-29 15:10:17,816 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > (Key Remover) > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > > FINISHED > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > > Attempting to recover all jobs. > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error: > > Failed to recover jobs. > > java.io.StreamCorruptedException: invalid type code: 00 at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > at java.util.HashMap.readObject(HashMap.java:1184) > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess > > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) > > at > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017 > > ) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > at > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > getState(FileSerializableStateHandle.java:58) > > at > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > getState(FileSerializableStateHandle.java:35) > > at > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.re > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManag > > er.scala:530) at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > > a:526) at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal > > a:526) at > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$ > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > at > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F > > uture.scala:24) at > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal > > a:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr > > actDispatcher.scala:401) at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > > java:1339) at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197 > > 9) at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea > > d.java:107) > > > > > > Do you have an idea of what can be wrong? I have no problems with > > other batch applications, just with this one. Why is it trying to recover the jobs In > the first place ? > > Thanks, > > Arnaud > > > > ________________________________ > > > > L'intégrité de ce message n'étant pas assurée sur internet, la société > > expéditrice ne peut être tenue responsable de son contenu ni de ses > > pièces jointes. Toute utilisation ou diffusion non autorisée est > > interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir > l'expéditeur. > > > > The integrity of this message cannot be guaranteed on the Internet. > > The company that sent this message cannot therefore be held liable for > > its content nor attachments. Any unauthorized use or dissemination is > > prohibited. If you are not the intended recipient of this message, then please delete > it and notify the sender. > > > > |
Hi Ufuk,
Yes, I have a large set of data to collect for a data science job that cannot be distributed easily. Increasing the akka.framesize size do get rid of the collect hang (maybe you should highlight this parameter in the collect() documentation, 10Mb si not that big), thanks. However my job manager now fails with OutOfMemory. Despite the fact that I have setup jobmanager.heap.mb: 8192 in my flink-conf.yaml, logs shows that it was created with less memory (1374 Mb) : 2016-12-08 13:50:13,808 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - -------------------------------------------------------------------------------- 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3, Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC) 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Current user: datcrypt 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Maximum heap size: 1374 MiBytes 2016-12-08 13:50:13,810 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - JAVA_HOME: /usr/java/default 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - Hadoop version: 2.6.3 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - JVM Options: 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - -Xmx1434M 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner - -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/container_e17_1480512120243_3635_01_000001/jobmanager.log Is there a command line option of flink / env variable that overrides it or am I missing something ? -- Arnaud -----Message d'origine----- De : Ufuk Celebi [mailto:[hidden email]] Envoyé : jeudi 8 décembre 2016 10:49 À : LINZ, Arnaud <[hidden email]>; [hidden email] Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error I also don't get why the job is recovering, but the oversized message is very likely the cause for the freezing collect, because the data set is gather via Akka. You can configure the frame size via "akka.framesize", which defaults to 10485760b (10 MB). Is the collected result larger than that? Could you try to increase the frame size and report back? – Ufuk On 7 December 2016 at 17:57:22, LINZ, Arnaud ([hidden email]) wrote: > Hi, > > Any news? It's maybe caused by an oversized akka payload (many > akka.remote.OversizedPayloadException: Discarding oversized payload > sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: > max allowed size 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMess > age > was 69074412 bytes in the log) > > How do I set akka's maximum-payload-bytes in my flink cluster? > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment(). > > Do I have to change the way I'm doing things ? How ? > > Thanks, > Arnaud > > -----Message d'origine----- > De : LINZ, Arnaud > Envoyé : mercredi 30 novembre 2016 08:59 À : [hidden email] > Objet : RE: Collect() freeze on yarn cluster on strange > recover/deserialization error > > Hi, > > Don't think so. I always delete the ZK path before launching the batch > (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS > rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears only before the collect() phase, not at the beginning. > > Full log is availlable here : > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r > > Thanks, > Arnaud > > > -----Message d'origine----- > De : Ufuk Celebi [mailto:[hidden email]] Envoyé : mardi 29 novembre > 2016 18:43 À : LINZ, Arnaud ; [hidden email] Objet : Re: > Collect() freeze on yarn cluster on strange recover/deserialization > error > > Hey Arnaud, > > could this be a left over job that is recovered from ZooKeeper? > Recovery only happens if the configured ZK root contains data. > > A job is removed from ZooKeeper only if it terminates (e.g. finishes, > fails terminally w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure. > > – Ufuk > > The complete JM logs will be helpful to further check what's happening there. > > > On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > > Hello, > > > > I have a Flink 1.1.3 batch application that makes a simple > > aggregation but freezes when > > collect() is called when the app is deployed on a ha-enabled yarn > > cluster (it works on a local cluster). > > Just before it hangs, I have the following deserialization error in the logs : > > > > (...) > > 2016-11-29 15:10:10,422 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > > switched from DEPLOYING to RUNNING > > 2016-11-29 15:10:13,175 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > (Key Remover) > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > > FINISHED > > 2016-11-29 15:10:17,816 INFO > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > (Key Remover) > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > > FINISHED > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > > Attempting to recover all jobs. > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error: > > Failed to recover jobs. > > java.io.StreamCorruptedException: invalid type code: 00 at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > 99 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > ) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > 17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > 99 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > ) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > 17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > at java.util.HashMap.readObject(HashMap.java:1184) > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcce > > ss > > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) > > at > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:10 > > 17 > > ) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893 > > ) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > 17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > 99 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > ) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > 17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > 99 > > 0) at > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > ) > > at > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > 17 > > 98) at > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > at > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > getState(FileSerializableStateHandle.java:58) > > at > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > getState(FileSerializableStateHandle.java:35) > > at > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore. > > re > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > e$ > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobMan > > ag > > er.scala:530) at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > e$ > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc > > al > > a:526) at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > e$ > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc > > al > > a:526) at > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > e$ > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > e$ > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > at > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > e$ > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > at > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1 > > (F > > uture.scala:24) at > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.sc > > al > > a:24) at > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abs > > tr > > actDispatcher.scala:401) at > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > > java:1339) at > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1 > > 97 > > 9) at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThr > > ea > > d.java:107) > > > > > > Do you have an idea of what can be wrong? I have no problems with > > other batch applications, just with this one. Why is it trying to > > recover the jobs In > the first place ? > > Thanks, > > Arnaud > > > > ________________________________ > > > > L'intégrité de ce message n'étant pas assurée sur internet, la > > société expéditrice ne peut être tenue responsable de son contenu ni > > de ses pièces jointes. Toute utilisation ou diffusion non autorisée > > est interdite. Si vous n'êtes pas destinataire de ce message, merci > > de le détruire et d'avertir > l'expéditeur. > > > > The integrity of this message cannot be guaranteed on the Internet. > > The company that sent this message cannot therefore be held liable > > for its content nor attachments. Any unauthorized use or > > dissemination is prohibited. If you are not the intended recipient > > of this message, then please delete > it and notify the sender. > > > > |
Good point with the collect() docs. Would you mind opening a JIRA issue for that?
I'm not sure whether you can specify it via that key for YARN. Can you try to use -yjm 8192 when submitting the job? Looping in Robert who knows best whether this config key is picked up or not for YARN. – Ufuk On 8 December 2016 at 14:05:41, LINZ, Arnaud ([hidden email]) wrote: > Hi Ufuk, > > Yes, I have a large set of data to collect for a data science job that cannot be distributed > easily. Increasing the akka.framesize size do get rid of the collect hang (maybe you > should highlight this parameter in the collect() documentation, 10Mb si not that big), > thanks. > > However my job manager now fails with OutOfMemory. > > Despite the fact that I have setup > jobmanager.heap.mb: 8192 > > in my flink-conf.yaml, logs shows that it was created with less memory (1374 Mb) : > > 2016-12-08 13:50:13,808 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - -------------------------------------------------------------------------------- > 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3, Rev:8e8d454, Date:10.10.2016 > @ 13:26:32 UTC) > 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - Current user: datcrypt > 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08 > 2016-12-08 13:50:13,809 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - Maximum heap size: 1374 MiBytes > 2016-12-08 13:50:13,810 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - JAVA_HOME: /usr/java/default > 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - Hadoop version: 2.6.3 > 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - JVM Options: > 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - -Xmx1434M > 2016-12-08 13:50:13,811 INFO org.apache.flink.yarn.YarnApplicationMasterRunner > - -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/container_e17_1480512120243_3635_01_000001/jobmanager.log > > > Is there a command line option of flink / env variable that overrides it or am I missing > something ? > -- Arnaud > > -----Message d'origine----- > De : Ufuk Celebi [mailto:[hidden email]] > Envoyé : jeudi 8 décembre 2016 10:49 > À : LINZ, Arnaud ; [hidden email] > Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error > > I also don't get why the job is recovering, but the oversized message is very likely the > cause for the freezing collect, because the data set is gather via Akka. > > You can configure the frame size via "akka.framesize", which defaults to 10485760b > (10 MB). > > Is the collected result larger than that? Could you try to increase the frame size and > report back? > > – Ufuk > > On 7 December 2016 at 17:57:22, LINZ, Arnaud ([hidden email]) wrote: > > Hi, > > > > Any news? It's maybe caused by an oversized akka payload (many > > akka.remote.OversizedPayloadException: Discarding oversized payload > > sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: > > max allowed size 10485760 bytes, actual size of encoded class > > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMess > > age > > was 69074412 bytes in the log) > > > > How do I set akka's maximum-payload-bytes in my flink cluster? > > > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about > > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment(). > > > > Do I have to change the way I'm doing things ? How ? > > > > Thanks, > > Arnaud > > > > -----Message d'origine----- > > De : LINZ, Arnaud > > Envoyé : mercredi 30 novembre 2016 08:59 À : [hidden email] > > Objet : RE: Collect() freeze on yarn cluster on strange > > recover/deserialization error > > > > Hi, > > > > Don't think so. I always delete the ZK path before launching the batch > > (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS > > rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears only before > the collect() phase, not at the beginning. > > > > Full log is availlable here : > > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r > > > > Thanks, > > Arnaud > > > > > > -----Message d'origine----- > > De : Ufuk Celebi [mailto:[hidden email]] Envoyé : mardi 29 novembre > > 2016 18:43 À : LINZ, Arnaud ; [hidden email] Objet : Re: > > Collect() freeze on yarn cluster on strange recover/deserialization > > error > > > > Hey Arnaud, > > > > could this be a left over job that is recovered from ZooKeeper? > > Recovery only happens if the configured ZK root contains data. > > > > A job is removed from ZooKeeper only if it terminates (e.g. finishes, > > fails terminally w/o restarting, cancelled). If you just shut down the cluster this > is treated as a failure. > > > > – Ufuk > > > > The complete JM logs will be helpful to further check what's happening there. > > > > > > On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > > > Hello, > > > > > > I have a Flink 1.1.3 batch application that makes a simple > > > aggregation but freezes when > > > collect() is called when the app is deployed on a ha-enabled yarn > > > cluster (it works on a local cluster). > > > Just before it hangs, I have the following deserialization error in the logs : > > > > > > (...) > > > 2016-11-29 15:10:10,422 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > > > switched from DEPLOYING to RUNNING > > > 2016-11-29 15:10:13,175 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > > (Key Remover) > > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > > > FINISHED > > > 2016-11-29 15:10:17,816 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map > > > (Key Remover) > > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > > > FINISHED > > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager - > > > Attempting to recover all jobs. > > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal > error: > > > Failed to recover jobs. > > > java.io.StreamCorruptedException: invalid type code: 00 at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > at java.util.HashMap.readObject(HashMap.java:1184) > > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcce > > > ss > > > orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) > > > at > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:10 > > > 17 > > > ) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > at > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > getState(FileSerializableStateHandle.java:58) > > > at > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > getState(FileSerializableStateHandle.java:35) > > > at > > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore. > > > re > > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobMan > > > ag > > > er.scala:530) at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc > > > al > > > a:526) at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.sc > > > al > > > a:526) at > > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > at > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1 > > > (F > > > uture.scala:24) at > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.sc > > > al > > > a:24) at > > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > > at > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abs > > > tr > > > actDispatcher.scala:401) at > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > at > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > > > java:1339) at > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1 > > > 97 > > > 9) at > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThr > > > ea > > > d.java:107) > > > > > > > > > Do you have an idea of what can be wrong? I have no problems with > > > other batch applications, just with this one. Why is it trying to > > > recover the jobs In > > the first place ? > > > Thanks, > > > Arnaud > > > > > > ________________________________ > > > > > > L'intégrité de ce message n'étant pas assurée sur internet, la > > > société expéditrice ne peut être tenue responsable de son contenu ni > > > de ses pièces jointes. Toute utilisation ou diffusion non autorisée > > > est interdite. Si vous n'êtes pas destinataire de ce message, merci > > > de le détruire et d'avertir > > l'expéditeur. > > > > > > The integrity of this message cannot be guaranteed on the Internet. > > > The company that sent this message cannot therefore be held liable > > > for its content nor attachments. Any unauthorized use or > > > dissemination is prohibited. If you are not the intended recipient > > > of this message, then please delete > > it and notify the sender. > > > > > > > > > |
-yjm works, and suits me better than a global fink-conf.yml parameter. I've looked for a command line parameter like that, but I've missed it in the doc, my mistake.
Thanks, Arnaud -----Message d'origine----- De : Ufuk Celebi [mailto:[hidden email]] Envoyé : jeudi 8 décembre 2016 14:43 À : LINZ, Arnaud <[hidden email]>; [hidden email] Cc : [hidden email] Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error Good point with the collect() docs. Would you mind opening a JIRA issue for that? I'm not sure whether you can specify it via that key for YARN. Can you try to use -yjm 8192 when submitting the job? Looping in Robert who knows best whether this config key is picked up or not for YARN. – Ufuk On 8 December 2016 at 14:05:41, LINZ, Arnaud ([hidden email]) wrote: > Hi Ufuk, > > Yes, I have a large set of data to collect for a data science job that > cannot be distributed easily. Increasing the akka.framesize size do > get rid of the collect hang (maybe you should highlight this parameter > in the collect() documentation, 10Mb si not that big), thanks. > > However my job manager now fails with OutOfMemory. > > Despite the fact that I have setup > jobmanager.heap.mb: 8192 > > in my flink-conf.yaml, logs shows that it was created with less memory (1374 Mb) : > > 2016-12-08 13:50:13,808 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - > ---------------------------------------------------------------------- > ---------- > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3, > Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC) > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Current user: datcrypt > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - > 1.7/24.45-b08 > 2016-12-08 13:50:13,809 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Maximum heap size: 1374 MiBytes > 2016-12-08 13:50:13,810 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - JAVA_HOME: /usr/java/default > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - Hadoop version: 2.6.3 > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - JVM Options: > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - -Xmx1434M > 2016-12-08 13:50:13,811 INFO > org.apache.flink.yarn.YarnApplicationMasterRunner > - > -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/cont > ainer_e17_1480512120243_3635_01_000001/jobmanager.log > > > Is there a command line option of flink / env variable that overrides > it or am I missing something ? > -- Arnaud > > -----Message d'origine----- > De : Ufuk Celebi [mailto:[hidden email]] Envoyé : jeudi 8 décembre > 2016 10:49 À : LINZ, Arnaud ; [hidden email] Objet : RE: > Collect() freeze on yarn cluster on strange recover/deserialization > error > > I also don't get why the job is recovering, but the oversized message > is very likely the cause for the freezing collect, because the data set is gather via Akka. > > You can configure the frame size via "akka.framesize", which defaults > to 10485760b > (10 MB). > > Is the collected result larger than that? Could you try to increase > the frame size and report back? > > – Ufuk > > On 7 December 2016 at 17:57:22, LINZ, Arnaud ([hidden email]) wrote: > > Hi, > > > > Any news? It's maybe caused by an oversized akka payload (many > > akka.remote.OversizedPayloadException: Discarding oversized payload > > sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: > > max allowed size 10485760 bytes, actual size of encoded class > > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMe > > ss > > age > > was 69074412 bytes in the log) > > > > How do I set akka's maximum-payload-bytes in my flink cluster? > > > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about > > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment(). > > > > Do I have to change the way I'm doing things ? How ? > > > > Thanks, > > Arnaud > > > > -----Message d'origine----- > > De : LINZ, Arnaud > > Envoyé : mercredi 30 novembre 2016 08:59 À : [hidden email] > > Objet : RE: Collect() freeze on yarn cluster on strange > > recover/deserialization error > > > > Hi, > > > > Don't think so. I always delete the ZK path before launching the > > batch (with /usr/bin/zookeeper-client -server > > $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and > > the "recovery" log line appears only before > the collect() phase, not at the beginning. > > > > Full log is availlable here : > > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r > > > > Thanks, > > Arnaud > > > > > > -----Message d'origine----- > > De : Ufuk Celebi [mailto:[hidden email]] Envoyé : mardi 29 novembre > > 2016 18:43 À : LINZ, Arnaud ; [hidden email] Objet : Re: > > Collect() freeze on yarn cluster on strange recover/deserialization > > error > > > > Hey Arnaud, > > > > could this be a left over job that is recovered from ZooKeeper? > > Recovery only happens if the configured ZK root contains data. > > > > A job is removed from ZooKeeper only if it terminates (e.g. > > finishes, fails terminally w/o restarting, cancelled). If you just > > shut down the cluster this > is treated as a failure. > > > > – Ufuk > > > > The complete JM logs will be helpful to further check what's happening there. > > > > > > On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > > > Hello, > > > > > > I have a Flink 1.1.3 batch application that makes a simple > > > aggregation but freezes when > > > collect() is called when the app is deployed on a ha-enabled yarn > > > cluster (it works on a local cluster). > > > Just before it hangs, I have the following deserialization error in the logs : > > > > > > (...) > > > 2016-11-29 15:10:10,422 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > > > switched from DEPLOYING to RUNNING > > > 2016-11-29 15:10:13,175 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> > > > Map (Key Remover) > > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > > > FINISHED > > > 2016-11-29 15:10:17,816 INFO > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> > > > Map (Key Remover) > > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > > > FINISHED > > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager > > > - Attempting to recover all jobs. > > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager > > > - Fatal > error: > > > Failed to recover jobs. > > > java.io.StreamCorruptedException: invalid type code: 00 at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > :1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > 15 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > :1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > 15 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > at java.util.HashMap.readObject(HashMap.java:1184) > > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc > > > ce > > > ss > > > orImpl.java:43) at > > > java.lang.reflect.Method.invoke(Method.java:606) > > > at > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java: > > > 10 > > > 17 > > > ) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:18 > > > 93 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > :1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > 15 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > :1 > > > 99 > > > 0) at > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > 15 > > > ) > > > at > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > 17 > > > 98) at > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > at > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > at > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > getState(FileSerializableStateHandle.java:58) > > > at > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > getState(FileSerializableStateHandle.java:35) > > > at > > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore. > > > re > > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > ag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobM > > > an > > > ag > > > er.scala:530) at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > ag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager. > > > sc > > > al > > > a:526) at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > ag > > > e$ > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager. > > > sc > > > al > > > a:526) at > > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > ag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > ag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > ag > > > e$ > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > at > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1 > > > $1 > > > (F > > > uture.scala:24) at > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future. > > > sc > > > al > > > a:24) at > > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > > at > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(A > > > bs > > > tr > > > actDispatcher.scala:401) at > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:26 > > > 0) > > > at > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > > > java:1339) at > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java > > > :1 > > > 97 > > > 9) at > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerT > > > hr > > > ea > > > d.java:107) > > > > > > > > > Do you have an idea of what can be wrong? I have no problems with > > > other batch applications, just with this one. Why is it trying to > > > recover the jobs In > > the first place ? > > > Thanks, > > > Arnaud > > > > > > ________________________________ > > > > > > L'intégrité de ce message n'étant pas assurée sur internet, la > > > société expéditrice ne peut être tenue responsable de son contenu > > > ni de ses pièces jointes. Toute utilisation ou diffusion non > > > autorisée est interdite. Si vous n'êtes pas destinataire de ce > > > message, merci de le détruire et d'avertir > > l'expéditeur. > > > > > > The integrity of this message cannot be guaranteed on the Internet. > > > The company that sent this message cannot therefore be held liable > > > for its content nor attachments. Any unauthorized use or > > > dissemination is prohibited. If you are not the intended recipient > > > of this message, then please delete > > it and notify the sender. > > > > > > > > > |
Great! :)
On 8 December 2016 at 15:28:05, LINZ, Arnaud ([hidden email]) wrote: > -yjm works, and suits me better than a global fink-conf.yml parameter. I've looked for > a command line parameter like that, but I've missed it in the doc, my mistake. > Thanks, > Arnaud > > -----Message d'origine----- > De : Ufuk Celebi [mailto:[hidden email]] > Envoyé : jeudi 8 décembre 2016 14:43 > À : LINZ, Arnaud ; [hidden email] > Cc : [hidden email] > Objet : RE: Collect() freeze on yarn cluster on strange recover/deserialization error > > Good point with the collect() docs. Would you mind opening a JIRA issue for that? > > I'm not sure whether you can specify it via that key for YARN. Can you try to use -yjm 8192 > when submitting the job? > > Looping in Robert who knows best whether this config key is picked up or not for YARN. > > – Ufuk > > On 8 December 2016 at 14:05:41, LINZ, Arnaud ([hidden email]) wrote: > > Hi Ufuk, > > > > Yes, I have a large set of data to collect for a data science job that > > cannot be distributed easily. Increasing the akka.framesize size do > > get rid of the collect hang (maybe you should highlight this parameter > > in the collect() documentation, 10Mb si not that big), thanks. > > > > However my job manager now fails with OutOfMemory. > > > > Despite the fact that I have setup > > jobmanager.heap.mb: 8192 > > > > in my flink-conf.yaml, logs shows that it was created with less memory (1374 Mb) : > > > > 2016-12-08 13:50:13,808 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - > > ---------------------------------------------------------------------- > > ---------- > > 2016-12-08 13:50:13,809 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - Starting YARN ApplicationMaster / JobManager (Version: 1.1.3, > > Rev:8e8d454, Date:10.10.2016 @ 13:26:32 UTC) > > 2016-12-08 13:50:13,809 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - Current user: datcrypt > > 2016-12-08 13:50:13,809 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - JVM: Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - > > 1.7/24.45-b08 > > 2016-12-08 13:50:13,809 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - Maximum heap size: 1374 MiBytes > > 2016-12-08 13:50:13,810 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - JAVA_HOME: /usr/java/default > > 2016-12-08 13:50:13,811 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - Hadoop version: 2.6.3 > > 2016-12-08 13:50:13,811 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - JVM Options: > > 2016-12-08 13:50:13,811 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - -Xmx1434M > > 2016-12-08 13:50:13,811 INFO > > org.apache.flink.yarn.YarnApplicationMasterRunner > > - > > -Dlog.file=/data/1/hadoop/yarn/log/application_1480512120243_3635/cont > > ainer_e17_1480512120243_3635_01_000001/jobmanager.log > > > > > > Is there a command line option of flink / env variable that overrides > > it or am I missing something ? > > -- Arnaud > > > > -----Message d'origine----- > > De : Ufuk Celebi [mailto:[hidden email]] Envoyé : jeudi 8 décembre > > 2016 10:49 À : LINZ, Arnaud ; [hidden email] Objet : RE: > > Collect() freeze on yarn cluster on strange recover/deserialization > > error > > > > I also don't get why the job is recovering, but the oversized message > > is very likely the cause for the freezing collect, because the data set is gather via > Akka. > > > > You can configure the frame size via "akka.framesize", which defaults > > to 10485760b > > (10 MB). > > > > Is the collected result larger than that? Could you try to increase > > the frame size and report back? > > > > – Ufuk > > > > On 7 December 2016 at 17:57:22, LINZ, Arnaud ([hidden email]) wrote: > > > Hi, > > > > > > Any news? It's maybe caused by an oversized akka payload (many > > > akka.remote.OversizedPayloadException: Discarding oversized payload > > > sent to Actor[akka.tcp://flink@172.21.125.20:39449/user/jobmanager#-1264474132]: > > > max allowed size 10485760 bytes, actual size of encoded class > > > org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMe > > > ss > > > age > > > was 69074412 bytes in the log) > > > > > > How do I set akka's maximum-payload-bytes in my flink cluster? > > > > > > https://issues.apache.org/jira/browse/FLINK-2373 is not clear about > > > that. I do not use ExecutionEnvironment.createRemoteEnvironment() but ExecutionEnvironment.getExecutionEnvironment(). > > > > > > Do I have to change the way I'm doing things ? How ? > > > > > > Thanks, > > > Arnaud > > > > > > -----Message d'origine----- > > > De : LINZ, Arnaud > > > Envoyé : mercredi 30 novembre 2016 08:59 À : [hidden email] > > > Objet : RE: Collect() freeze on yarn cluster on strange > > > recover/deserialization error > > > > > > Hi, > > > > > > Don't think so. I always delete the ZK path before launching the > > > batch (with /usr/bin/zookeeper-client -server > > > $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and > > > the "recovery" log line appears only before > > the collect() phase, not at the beginning. > > > > > > Full log is availlable here : > > > https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r > > > > > > Thanks, > > > Arnaud > > > > > > > > > -----Message d'origine----- > > > De : Ufuk Celebi [mailto:[hidden email]] Envoyé : mardi 29 novembre > > > 2016 18:43 À : LINZ, Arnaud ; [hidden email] Objet : Re: > > > Collect() freeze on yarn cluster on strange recover/deserialization > > > error > > > > > > Hey Arnaud, > > > > > > could this be a left over job that is recovered from ZooKeeper? > > > Recovery only happens if the configured ZK root contains data. > > > > > > A job is removed from ZooKeeper only if it terminates (e.g. > > > finishes, fails terminally w/o restarting, cancelled). If you just > > > shut down the cluster this > > is treated as a failure. > > > > > > – Ufuk > > > > > > The complete JM logs will be helpful to further check what's happening there. > > > > > > > > > On 29 November 2016 at 18:15:16, LINZ, Arnaud ([hidden email]) wrote: > > > > Hello, > > > > > > > > I have a Flink 1.1.3 batch application that makes a simple > > > > aggregation but freezes when > > > > collect() is called when the app is deployed on a ha-enabled yarn > > > > cluster (it works on a local cluster). > > > > Just before it hangs, I have the following deserialization error in the logs : > > > > > > > > (...) > > > > 2016-11-29 15:10:10,422 INFO > > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > > - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96) > > > > switched from DEPLOYING to RUNNING > > > > 2016-11-29 15:10:13,175 INFO > > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> > > > > Map (Key Remover) > > > > (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to > > > > FINISHED > > > > 2016-11-29 15:10:17,816 INFO > > > > org.apache.flink.runtime.executiongraph.ExecutionGraph > > > > - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> > > > > Map (Key Remover) > > > > (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to > > > > FINISHED > > > > 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager > > > > - Attempting to recover all jobs. > > > > 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager > > > > - Fatal > > error: > > > > Failed to recover jobs. > > > > java.io.StreamCorruptedException: invalid type code: 00 at > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) > > > > at > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > > :1 > > > > 99 > > > > 0) at > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > > 15 > > > > ) > > > > at > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > > 17 > > > > 98) at > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > at > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > > :1 > > > > 99 > > > > 0) at > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > > 15 > > > > ) > > > > at > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > > 17 > > > > 98) at > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > at > > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > > at java.util.HashMap.readObject(HashMap.java:1184) > > > > at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAc > > > > ce > > > > ss > > > > orImpl.java:43) at > > > > java.lang.reflect.Method.invoke(Method.java:606) > > > > at > > > > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java: > > > > 10 > > > > 17 > > > > ) at > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:18 > > > > 93 > > > > ) > > > > at > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > > 17 > > > > 98) at > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > at > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > > :1 > > > > 99 > > > > 0) at > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > > 15 > > > > ) > > > > at > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > > 17 > > > > 98) at > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > at > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java > > > > :1 > > > > 99 > > > > 0) at > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:19 > > > > 15 > > > > ) > > > > at > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java: > > > > 17 > > > > 98) at > > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > at > > > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > > at > > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > > getState(FileSerializableStateHandle.java:58) > > > > at > > > > org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle. > > > > getState(FileSerializableStateHandle.java:35) > > > > at > > > > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore. > > > > re > > > > coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173) > > > > at > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > > ag > > > > e$ > > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobM > > > > an > > > > ag > > > > er.scala:530) at > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > > ag > > > > e$ > > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager. > > > > sc > > > > al > > > > a:526) at > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > > ag > > > > e$ > > > > 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager. > > > > sc > > > > al > > > > a:526) at > > > > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > > > at > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > > ag > > > > e$ > > > > 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526) > > > > at > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > > ag > > > > e$ > > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > > at > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMess > > > > ag > > > > e$ > > > > 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522) > > > > at > > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1 > > > > $1 > > > > (F > > > > uture.scala:24) at > > > > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future. > > > > sc > > > > al > > > > a:24) at > > > > akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > > > > at > > > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(A > > > > bs > > > > tr > > > > actDispatcher.scala:401) at > > > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:26 > > > > 0) > > > > at > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool. > > > > java:1339) at > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java > > > > :1 > > > > 97 > > > > 9) at > > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerT > > > > hr > > > > ea > > > > d.java:107) > > > > > > > > > > > > Do you have an idea of what can be wrong? I have no problems with > > > > other batch applications, just with this one. Why is it trying to > > > > recover the jobs In > > > the first place ? > > > > Thanks, > > > > Arnaud > > > > > > > > ________________________________ > > > > > > > > L'intégrité de ce message n'étant pas assurée sur internet, la > > > > société expéditrice ne peut être tenue responsable de son contenu > > > > ni de ses pièces jointes. Toute utilisation ou diffusion non > > > > autorisée est interdite. Si vous n'êtes pas destinataire de ce > > > > message, merci de le détruire et d'avertir > > > l'expéditeur. > > > > > > > > The integrity of this message cannot be guaranteed on the Internet. > > > > The company that sent this message cannot therefore be held liable > > > > for its content nor attachments. Any unauthorized use or > > > > dissemination is prohibited. If you are not the intended recipient > > > > of this message, then please delete > > > it and notify the sender. > > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |