Hi !
We are trying flink in HA mode. Our application is a streaming application with windowing mechanism. We set in the flink yaml :
state.backend: filesystem recovery.mode: zookeeper recovery.zookeeper.path.root: <path> recovery.zookeeper.storageDir: <storageDir> recovery.backend.fs.checkpointdir: <pathcheckpoint> yarn.application-attempts: 100
We want in case of application crash, the pending window has to be restore when the application restart. Pending data are store into the <storageDir>/blob directory ? Also, we try to write a script who restart the application after exceed the max attempts, with the last pending window. How can I do that ? A simple restart of the application is enough, or do I have to "clean" the recovery.zookeeper.path.root ?
Thanks !
Thomas Lamirault |
On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
<[hidden email]> wrote: > We are trying flink in HA mode. Great to hear! > We set in the flink yaml : > > state.backend: filesystem > > recovery.mode: zookeeper > recovery.zookeeper.quorum:<Our quorum> > > recovery.zookeeper.path.root: <path> > > recovery.zookeeper.storageDir: <storageDir> > > recovery.backend.fs.checkpointdir: <pathcheckpoint> It should be state.backend.fs.checkpointdir. Just to check: Both state.backend.fs.checkpointdir and recovery.zookeeper.path.root should point to a file system path. > yarn.application-attempts: 100 This is only relevant if you are using YARN. From your complete > We want in case of application crash, the pending window has to be restore > when the application restart. > > Pending data are store into the <storageDir>/blob directory ? > > Also, we try to write a script who restart the application after exceed the > max attempts, with the last pending window. > > How can I do that ? A simple restart of the application is enough, or do I > have to "clean" the recovery.zookeeper.path.root ? Restore happens automatically to the most recently checkpointed state. Everything under <storageDir> contains the actual state (including JARs and JobGraph). ZooKeeper contains pointers to this state. Therefore, you must not delete the ZooKeeper root path. For the automatic restart, I would recommend using YARN. If you want to do it manually, you need to restart the JobManager/TaskManager instances. The application will be recovered automatically from ZooKeeper/state backend. Does this help? – Ufuk |
Thanks for the quick reply !
> state.backend.fs.checkpointdir Is actually pointing to a hdfs directory but I will modify the recovery.zookeeper.path.root > This is only relevant if you are using YARN. From your complete Yes, I omit to say we will use YARN. >Does this help? Yes, a lot :-) Thomas ________________________________________ De : Ufuk Celebi [[hidden email]] Envoyé : jeudi 18 février 2016 19:19 À : [hidden email] Objet : Re: Flink HA On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault <[hidden email]> wrote: > We are trying flink in HA mode. Great to hear! > We set in the flink yaml : > > state.backend: filesystem > > recovery.mode: zookeeper > recovery.zookeeper.quorum:<Our quorum> > > recovery.zookeeper.path.root: <path> > > recovery.zookeeper.storageDir: <storageDir> > > recovery.backend.fs.checkpointdir: <pathcheckpoint> It should be state.backend.fs.checkpointdir. Just to check: Both state.backend.fs.checkpointdir and recovery.zookeeper.path.root should point to a file system path. > yarn.application-attempts: 100 This is only relevant if you are using YARN. From your complete > We want in case of application crash, the pending window has to be restore > when the application restart. > > Pending data are store into the <storageDir>/blob directory ? > > Also, we try to write a script who restart the application after exceed the > max attempts, with the last pending window. > > How can I do that ? A simple restart of the application is enough, or do I > have to "clean" the recovery.zookeeper.path.root ? Restore happens automatically to the most recently checkpointed state. Everything under <storageDir> contains the actual state (including JARs and JobGraph). ZooKeeper contains pointers to this state. Therefore, you must not delete the ZooKeeper root path. For the automatic restart, I would recommend using YARN. If you want to do it manually, you need to restart the JobManager/TaskManager instances. The application will be recovered automatically from ZooKeeper/state backend. Does this help? – Ufuk |
After set this configuration, I have some exceptions :
java.lang.Exception: Could not restore checkpointed state to operators and functions at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406) ... 3 more Caused by: java.lang.IllegalArgumentException: illegal signature at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708) ... 13 more If I run the application in not-HA mode, there is no problem. What can cause this kind of error ? Thanks Thomas ________________________________________De : Thomas Lamirault [[hidden email]]Envoyé : vendredi 19 février 2016 09:39À : [hidden email] : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [[hidden email]]Envoyé : jeudi 18 février 2016 19:19À : [hidden email] : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<[hidden email]> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk |
I have resolved my issues.
It seems that Avro had difficulties with my POJO. I change the management of the null value and it works fine. But, there is a way to cancel the old jobGraph who are starving in restarting status, and to keep the last one to restart ? Other than cancel JobId manually ? Thanks Thomas ________________________________________ De : Thomas Lamirault [[hidden email]] Envoyé : vendredi 19 février 2016 10:56 À : [hidden email] Objet : RE:Flink HA After set this configuration, I have some exceptions : java.lang.Exception: Could not restore checkpointed state to operators and functions at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406) ... 3 more Caused by: java.lang.IllegalArgumentException: illegal signature at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708) ... 13 more If I run the application in not-HA mode, there is no problem. What can cause this kind of error ? Thanks Thomas ________________________________________De : Thomas Lamirault [[hidden email]]Envoyé : vendredi 19 février 2016 09:39À : [hidden email] : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [[hidden email]]Envoyé : jeudi 18 février 2016 19:19À : [hidden email] : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<[hidden email]> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk |
Hi Thomas, To avoid having jobs forever restarting, you have to cancel them manually (from the web interface or the /bin/flink client). Also, you can set an appropriate restart strategy (in 1.0-SNAPSHOT), which limits the number of retries. This way the retrying will eventually stop. On Fri, Feb 19, 2016 at 4:05 PM, Thomas Lamirault <[hidden email]> wrote: I have resolved my issues. |
Free forum by Nabble | Edit this page |