Flink HA

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink HA

Thomas Lamirault

Hi !

 

We are trying flink in HA mode.

Our application is a streaming application with windowing mechanism.

We set in the flink yaml :

 

state.backend: filesystem

recovery.mode: zookeeper
recovery.zookeeper.quorum:<Our quorum>

recovery.zookeeper.path.root: <path>

recovery.zookeeper.storageDir: <storageDir>

recovery.backend.fs.checkpointdir: <pathcheckpoint>

yarn.application-attempts: 100

 

We want in case of application crash, the pending window has to be restore when the application restart.

Pending data are store into the <storageDir>/blob directory ?

Also, we try to write a script who restart the application after exceed the max attempts, with the last pending window.

How can I do that ? A simple restart of the application is enough, or do I have to "clean" the recovery.zookeeper.path.root ?

 

Thanks !

 

Thomas Lamirault

Reply | Threaded
Open this post in threaded view
|

Re: Flink HA

Ufuk Celebi
On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
<[hidden email]> wrote:
> We are trying flink in HA mode.

Great to hear!

> We set in the flink yaml :
>
> state.backend: filesystem
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:<Our quorum>
>
> recovery.zookeeper.path.root: <path>
>
> recovery.zookeeper.storageDir: <storageDir>
>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>

It should be state.backend.fs.checkpointdir.

Just to check: Both state.backend.fs.checkpointdir and
recovery.zookeeper.path.root should point to a file system path.

> yarn.application-attempts: 100

This is only relevant if you are using YARN. From your complete


> We want in case of application crash, the pending window has to be restore
> when the application restart.
>
> Pending data are store into the <storageDir>/blob directory ?
>
> Also, we try to write a script who restart the application after exceed the
> max attempts, with the last pending window.
>
> How can I do that ? A simple restart of the application is enough, or do I
> have to "clean" the recovery.zookeeper.path.root ?

Restore happens automatically to the most recently checkpointed state.

Everything under <storageDir> contains the actual state (including
JARs and JobGraph). ZooKeeper contains pointers to this state.
Therefore, you must not delete the ZooKeeper root path.

For the automatic restart, I would recommend using YARN. If you want
to do it manually, you need to restart the JobManager/TaskManager
instances. The application will be recovered automatically from
ZooKeeper/state backend.


Does this help?

– Ufuk
Reply | Threaded
Open this post in threaded view
|

RE:Flink HA

Thomas Lamirault
Thanks for the quick reply !

> state.backend.fs.checkpointdir
Is actually pointing to a hdfs directory but I will modify  the recovery.zookeeper.path.root

> This is only relevant if you are using YARN. From your complete
Yes, I omit to say we will use YARN.

>Does this help?
Yes, a lot :-)

Thomas

________________________________________
De : Ufuk Celebi [[hidden email]]
Envoyé : jeudi 18 février 2016 19:19
À : [hidden email]
Objet : Re: Flink HA

On Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault
<[hidden email]> wrote:
> We are trying flink in HA mode.

Great to hear!

> We set in the flink yaml :
>
> state.backend: filesystem
>
> recovery.mode: zookeeper
> recovery.zookeeper.quorum:<Our quorum>
>
> recovery.zookeeper.path.root: <path>
>
> recovery.zookeeper.storageDir: <storageDir>
>
> recovery.backend.fs.checkpointdir: <pathcheckpoint>

It should be state.backend.fs.checkpointdir.

Just to check: Both state.backend.fs.checkpointdir and
recovery.zookeeper.path.root should point to a file system path.

> yarn.application-attempts: 100

This is only relevant if you are using YARN. From your complete


> We want in case of application crash, the pending window has to be restore
> when the application restart.
>
> Pending data are store into the <storageDir>/blob directory ?
>
> Also, we try to write a script who restart the application after exceed the
> max attempts, with the last pending window.
>
> How can I do that ? A simple restart of the application is enough, or do I
> have to "clean" the recovery.zookeeper.path.root ?

Restore happens automatically to the most recently checkpointed state.

Everything under <storageDir> contains the actual state (including
JARs and JobGraph). ZooKeeper contains pointers to this state.
Therefore, you must not delete the ZooKeeper root path.

For the automatic restart, I would recommend using YARN. If you want
to do it manually, you need to restart the JobManager/TaskManager
instances. The application will be recovered automatically from
ZooKeeper/state backend.


Does this help?

– Ufuk
Reply | Threaded
Open this post in threaded view
|

RE:Flink HA

Thomas Lamirault
After set this configuration, I have some exceptions :

java.lang.Exception: Could not restore checkpointed state to operators and functions
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
        at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
        ... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
        at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
        ... 13 more


If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?

Thanks

Thomas
________________________________________De : Thomas Lamirault [[hidden email]]Envoyé : vendredi 19 février 2016 09:39À : [hidden email] : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify  the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [[hidden email]]Envoyé : jeudi 18 février 2016 19:19À : [hidden email] : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<[hidden email]> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk
Reply | Threaded
Open this post in threaded view
|

RE:Flink HA

Thomas Lamirault
I have resolved my issues.
It seems that Avro had difficulties with my POJO. I change the management of the null value and it works fine.

But, there is a way to cancel the old jobGraph who are starving in restarting status, and to keep the last one to restart ? Other than cancel JobId manually ?

Thanks

Thomas
________________________________________
De : Thomas Lamirault [[hidden email]]
Envoyé : vendredi 19 février 2016 10:56
À : [hidden email]
Objet : RE:Flink HA

After set this configuration, I have some exceptions :

java.lang.Exception: Could not restore checkpointed state to operators and functions
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
        at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
        ... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
        at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
        ... 13 more


If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?

Thanks

Thomas
________________________________________De : Thomas Lamirault [[hidden email]]Envoyé : vendredi 19 février 2016 09:39À : [hidden email] : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify  the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [[hidden email]]Envoyé : jeudi 18 février 2016 19:19À : [hidden email] : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<[hidden email]> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk
Reply | Threaded
Open this post in threaded view
|

Re: Flink HA

rmetzger0
Hi Thomas,

To avoid having jobs forever restarting, you have to cancel them manually (from the web interface or the /bin/flink client).
Also, you can set an appropriate restart strategy (in 1.0-SNAPSHOT), which limits the number of retries. This way the retrying will eventually stop.

On Fri, Feb 19, 2016 at 4:05 PM, Thomas Lamirault <[hidden email]> wrote:
I have resolved my issues.
It seems that Avro had difficulties with my POJO. I change the management of the null value and it works fine.

But, there is a way to cancel the old jobGraph who are starving in restarting status, and to keep the last one to restart ? Other than cancel JobId manually ?

Thanks

Thomas
________________________________________
De : Thomas Lamirault [[hidden email]]
Envoyé : vendredi 19 février 2016 10:56
À : [hidden email]
Objet : RE:Flink HA

After set this configuration, I have some exceptions :

java.lang.Exception: Could not restore checkpointed state to operators and functions
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:414)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:208)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: java.util.HashMap; invalid descriptor for field
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:710)
        at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:830)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1601)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.<init>(WindowOperator.java:446)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.restoreState(WindowOperator.java:621)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateLazy(StreamTask.java:406)
        ... 3 more
Caused by: java.lang.IllegalArgumentException: illegal signature
        at java.io.ObjectStreamField.<init>(ObjectStreamField.java:122)
        at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:708)
        ... 13 more


If I run the application in not-HA mode, there is no problem.
What can cause this kind of error ?

Thanks

Thomas
________________________________________De : Thomas Lamirault [[hidden email]]Envoyé : vendredi 19 février 2016 09:39À : [hidden email] : RE:Flink HAThanks for the quick reply !> state.backend.fs.checkpointdirIs actually pointing to a hdfs directory but I will modify  the recovery.zookeeper.path.root> This is only relevant if you are using YARN. From your completeYes, I omit to say we will use YARN.>Does this help?Yes, a lot :-)Thomas________________________________________De : Ufuk Celebi [[hidden email]]Envoyé : jeudi 18 février 2016 19:19À : [hidden email] : Re: Flink HAOn Thu, Feb 18, 2016 at 6:59 PM, Thomas Lamirault<[hidden email]> wrote:> We are trying flink in HA mode.Great to hear!> We set in the flink yaml :>> state.backend: filesystem>> recovery.mode: zookeeper> recovery.zookeeper.quorum:<Our quorum>>> recovery.zookeeper.path.root: <path>>> recovery.zookeeper.storageDir: <storageDir>>> recovery.backend.fs.checkpointdir: <pathcheckpoint>It should be state.backend.fs.checkpointdir.Just to check: Both state.backend.fs.checkpointdir andrecovery.zookeeper.path.root should point to a file system path.> yarn.application-attempts: 100This is only relevant if you are using YARN. From your complete> We want in case of application crash, the pending window has to be restore> when the application restart.>> Pending data are store into the <storageDir>/blob directory ?>> Also, we try to write a script who restart the application after exceed the> max attempts, with the last pending window.>> How can I do that ? A simple restart of the application is enough, or do I> have to "clean" the recovery.zookeeper.path.root ?Restore happens automatically to the most recently checkpointed state.Everything under <storageDir> contains the actual state (includingJARs and JobGraph). ZooKeeper contains pointers to this state.Therefore, you must not delete the ZooKeeper root path.For the automatic restart, I would recommend using YARN. If you wantto do it manually, you need to restart the JobManager/TaskManagerinstances. The application will be recovered automatically fromZooKeeper/state backend.Does this help?– Ufuk