Fast restart of a job with a large state

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Fast restart of a job with a large state

Sergey Zhemzhitsky
Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
Reply | Threaded
Open this post in threaded view
|

Re: Fast restart of a job with a large state

Paul Lam
Hi,

Have you tried task local recovery [1]?


Best,
Paul Lam

在 2019年4月17日,17:46,Sergey Zhemzhitsky <[hidden email]> 写道:

Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints

Reply | Threaded
Open this post in threaded view
|

Re: Fast restart of a job with a large state

Paul Lam
The URL in my previous mail is wrong, and it should be: 

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

Best,
Paul Lam

在 2019年4月18日,18:04,Paul Lam <[hidden email]> 写道:

Hi,

Have you tried task local recovery [1]?


Best,
Paul Lam

在 2019年4月17日,17:46,Sergey Zhemzhitsky <[hidden email]> 写道:

Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints


Reply | Threaded
Open this post in threaded view
|

Re: Fast restart of a job with a large state

Stefan Richter-4
Hi,

If rescaling is the problem, let me clarify that you can currently rescale from savepoints and all types of checkpoints (including incremental). If that was the only problem, then there is nothing to worry about - the documentation is only a bit conservative about this because we will not commit to an APU that all future types checkpoints will be resealable. But currently they are all, and this is also very unlikely to change anytime soon.

Paul, just to comment on your suggestion as well, local recovery would only help with failover. 1) It does not help for restarts by the user and 2) also does not work for rescaling (2) is a consequence of 1) because failover never rescales, only restarts).

Best,
Stefan

On 18. Apr 2019, at 12:07, Paul Lam <[hidden email]> wrote:

The URL in my previous mail is wrong, and it should be: 

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

Best,
Paul Lam

在 2019年4月18日,18:04,Paul Lam <[hidden email]> 写道:

Hi,

Have you tried task local recovery [1]?


Best,
Paul Lam

在 2019年4月17日,17:46,Sergey Zhemzhitsky <[hidden email]> 写道:

Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints



Reply | Threaded
Open this post in threaded view
|

Re: Fast restart of a job with a large state

Sergey Zhemzhitsky
Hi Stefan, Paul,

Thanks for the tips! Currently I have not tried neither rescaling from checkpoints nor task local recovery. Now it's a subject to test.

In case it will be necessary not to just rescale a job, but also to change its DAG - is there a way to have something like let's call it "local savepoints" or "incremental savepoints" to prevent the whole state transferring to and from a distributed storage? 

Kind Regards,
Sergey


On Thu, Apr 18, 2019, 13:22 Stefan Richter <[hidden email]> wrote:
Hi,

If rescaling is the problem, let me clarify that you can currently rescale from savepoints and all types of checkpoints (including incremental). If that was the only problem, then there is nothing to worry about - the documentation is only a bit conservative about this because we will not commit to an APU that all future types checkpoints will be resealable. But currently they are all, and this is also very unlikely to change anytime soon.

Paul, just to comment on your suggestion as well, local recovery would only help with failover. 1) It does not help for restarts by the user and 2) also does not work for rescaling (2) is a consequence of 1) because failover never rescales, only restarts).

Best,
Stefan

On 18. Apr 2019, at 12:07, Paul Lam <[hidden email]> wrote:

The URL in my previous mail is wrong, and it should be: 

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

Best,
Paul Lam

在 2019年4月18日,18:04,Paul Lam <[hidden email]> 写道:

Hi,

Have you tried task local recovery [1]?


Best,
Paul Lam

在 2019年4月17日,17:46,Sergey Zhemzhitsky <[hidden email]> 写道:

Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints



Reply | Threaded
Open this post in threaded view
|

Re: Fast restart of a job with a large state

Till Rohrmann
Hi Sergey,

at the moment neither local nor incremental savepoints are supported in Flink afaik. There were some ideas wrt incremental savepoints floating around in the community but nothing concrete yet.

Cheers,
Till

On Tue, Apr 23, 2019 at 6:58 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hi Stefan, Paul,

Thanks for the tips! Currently I have not tried neither rescaling from checkpoints nor task local recovery. Now it's a subject to test.

In case it will be necessary not to just rescale a job, but also to change its DAG - is there a way to have something like let's call it "local savepoints" or "incremental savepoints" to prevent the whole state transferring to and from a distributed storage? 

Kind Regards,
Sergey


On Thu, Apr 18, 2019, 13:22 Stefan Richter <[hidden email]> wrote:
Hi,

If rescaling is the problem, let me clarify that you can currently rescale from savepoints and all types of checkpoints (including incremental). If that was the only problem, then there is nothing to worry about - the documentation is only a bit conservative about this because we will not commit to an APU that all future types checkpoints will be resealable. But currently they are all, and this is also very unlikely to change anytime soon.

Paul, just to comment on your suggestion as well, local recovery would only help with failover. 1) It does not help for restarts by the user and 2) also does not work for rescaling (2) is a consequence of 1) because failover never rescales, only restarts).

Best,
Stefan

On 18. Apr 2019, at 12:07, Paul Lam <[hidden email]> wrote:

The URL in my previous mail is wrong, and it should be: 

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

Best,
Paul Lam

在 2019年4月18日,18:04,Paul Lam <[hidden email]> 写道:

Hi,

Have you tried task local recovery [1]?


Best,
Paul Lam

在 2019年4月17日,17:46,Sergey Zhemzhitsky <[hidden email]> 写道:

Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints



Reply | Threaded
Open this post in threaded view
|

Re: Fast restart of a job with a large state

Sergey Zhemzhitsky
Hi Till,

Thanks for the info!
It's good to know.

Regards,
Sergey


On Wed, Apr 24, 2019, 13:08 Till Rohrmann <[hidden email]> wrote:
Hi Sergey,

at the moment neither local nor incremental savepoints are supported in Flink afaik. There were some ideas wrt incremental savepoints floating around in the community but nothing concrete yet.

Cheers,
Till

On Tue, Apr 23, 2019 at 6:58 PM Sergey Zhemzhitsky <[hidden email]> wrote:
Hi Stefan, Paul,

Thanks for the tips! Currently I have not tried neither rescaling from checkpoints nor task local recovery. Now it's a subject to test.

In case it will be necessary not to just rescale a job, but also to change its DAG - is there a way to have something like let's call it "local savepoints" or "incremental savepoints" to prevent the whole state transferring to and from a distributed storage? 

Kind Regards,
Sergey


On Thu, Apr 18, 2019, 13:22 Stefan Richter <[hidden email]> wrote:
Hi,

If rescaling is the problem, let me clarify that you can currently rescale from savepoints and all types of checkpoints (including incremental). If that was the only problem, then there is nothing to worry about - the documentation is only a bit conservative about this because we will not commit to an APU that all future types checkpoints will be resealable. But currently they are all, and this is also very unlikely to change anytime soon.

Paul, just to comment on your suggestion as well, local recovery would only help with failover. 1) It does not help for restarts by the user and 2) also does not work for rescaling (2) is a consequence of 1) because failover never rescales, only restarts).

Best,
Stefan

On 18. Apr 2019, at 12:07, Paul Lam <[hidden email]> wrote:

The URL in my previous mail is wrong, and it should be: 

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

Best,
Paul Lam

在 2019年4月18日,18:04,Paul Lam <[hidden email]> 写道:

Hi,

Have you tried task local recovery [1]?


Best,
Paul Lam

在 2019年4月17日,17:46,Sergey Zhemzhitsky <[hidden email]> 写道:

Hi Flinkers,

Operating different flink jobs I've discovered that job restarts with
a pretty large state (in my case this is up to 100GB+) take quite a
lot of time. For example, to restart a job (e.g. to update it) the
savepoint is created, and in case of savepoints all the state seems to
be pushed into the distributed store (hdfs in my case) when stopping a
job and pulling this state back when starting the new version of the
job.

What I've found by the moment trying to speed up job restarts is:
- using external retained checkpoints [1]; the drawback is that the
job cannot be rescaled during restart
- using external state and storage with the stateless jobs; the
drawback is the necessity of additional network hops to this storage.

So I'm wondering whether there are any best practices community knows
and uses to cope with the cases like this?

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints