Posted by
LINZ, Arnaud on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/Collect-freeze-on-yarn-cluster-on-strange-recover-deserialization-error-tp10378p10384.html
Hi,
Don't think so. I always delete the ZK path before launching the batch (with /usr/bin/zookeeper-client -server $FLINK_HA_ZOOKEEPER_SERVERS rmr $FLINK_HA_ZOOKEEPER_PATH_BATCH), and the "recovery" log line appears only before the collect() phase, not at the beginning.
Full log is availlable here :
https://ftpext.bouyguestelecom.fr/?u=JDhCUdcAImsANZQdys86yID6UNq8H2r
Thanks,
Arnaud
-----Message d'origine-----
De : Ufuk Celebi [mailto:
[hidden email]]
Envoyé : mardi 29 novembre 2016 18:43
À : LINZ, Arnaud <
[hidden email]>;
[hidden email]
Objet : Re: Collect() freeze on yarn cluster on strange recover/deserialization error
Hey Arnaud,
could this be a left over job that is recovered from ZooKeeper? Recovery only happens if the configured ZK root contains data.
A job is removed from ZooKeeper only if it terminates (e.g. finishes, fails terminally w/o restarting, cancelled). If you just shut down the cluster this is treated as a failure.
– Ufuk
The complete JM logs will be helpful to further check what's happening there.
On 29 November 2016 at 18:15:16, LINZ, Arnaud (
[hidden email]) wrote:
> Hello,
>
> I have a Flink 1.1.3 batch application that makes a simple aggregation
> but freezes when
> collect() is called when the app is deployed on a ha-enabled yarn
> cluster (it works on a local cluster).
> Just before it hangs, I have the following deserialization error in the logs :
>
> (...)
> 2016-11-29 15:10:10,422 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - DataSink (collect()) (1/4) (10cae0de2f4e7b6d71f21209072f7c96)
> switched from DEPLOYING to RUNNING
> 2016-11-29 15:10:13,175 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> (Key Remover)
> (2/4) (c098cf691c28364ca47d322c7a76259a) switched from RUNNING to
> FINISHED
> 2016-11-29 15:10:17,816 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - CHAIN Reduce(Reduce at agregation(YarnAnonymiser.java:114)) -> Map
> (Key Remover)
> (1/4) (aa6953c3c3a7c9d06ff714e13d020e38) switched from RUNNING to
> FINISHED
> 2016-11-29 15:10:38,060 INFO org.apache.flink.yarn.YarnJobManager -
> Attempting to recover all jobs.
> 2016-11-29 15:10:38,167 ERROR org.apache.flink.yarn.YarnJobManager - Fatal error:
> Failed to recover jobs.
> java.io.StreamCorruptedException: invalid type code: 00 at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at java.util.HashMap.readObject(HashMap.java:1184)
> at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source) at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccess
> orImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017
> ) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:199
> 0) at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 98) at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.
> getState(FileSerializableStateHandle.java:58)
> at
> org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle.
> getState(FileSerializableStateHandle.java:35)
> at
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.re
> coverJobGraphs(ZooKeeperSubmittedJobGraphStore.java:173)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply$mcV$sp(JobManag
> er.scala:530) at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal
> a:526) at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2$$anonfun$apply$mcV$sp$2.apply(JobManager.scal
> a:526) at
> scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2.apply$mcV$sp(JobManager.scala:526)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$
> 1$$anonfun$applyOrElse$2.apply(JobManager.scala:522)
> at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(F
> uture.scala:24) at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scal
> a:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr
> actDispatcher.scala:401) at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.
> java:1339) at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197
> 9) at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea
> d.java:107)
>
>
> Do you have an idea of what can be wrong? I have no problems with
> other batch applications, just with this one. Why is it trying to recover the jobs In the first place ?
> Thanks,
> Arnaud
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses
> pièces jointes. Toute utilisation ou diffusion non autorisée est
> interdite. Si vous n'êtes pas destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet.
> The company that sent this message cannot therefore be held liable for
> its content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then please delete it and notify the sender.
>