external checkpoints

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

external checkpoints

aviad
Hi,
I have several jobs which configured for external check-pointing (enableExternalizedCheckpoints)
how can I correlate between checkpoint and jobs.
for example, if i want to write script which monitor if the job is up or not and if the job is down it will resume the job from the externalized checkpoint.
how could i know which checkpoint belong to the specific job?

can I configure each job to write the external check-pointing to a different position?

my configuration is:
state.backend: rocksdb
state.backend.fs.checkpointdirs3a://flink-bucket/backend/checkpoints
state.checkpoints.dirs3a://flink-bucket/checkpoints

and in the code I set:
enableCheckpointing
enableExternalizedCheckpoints
Reply | Threaded
Open this post in threaded view
|

Re: external checkpoints

Jins George
Hi Aviad,

I had a similar situation and my solution was to use the flink monitoring rest api (/jobs/{jobid}/checkpoints) to get the mapping between job and checkpoint file.
Wrap this in a script and run periodically( in my case, it was 30 sec).

You can also configure each job with an externalized checkpoint directory. Refer https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#directory-structure

Thanks,
Jins

On 11/15/2017 06:34 AM, Aviad Rotem wrote:
Hi,
I have several jobs which configured for external check-pointing (enableExternalizedCheckpoints)
how can I correlate between checkpoint and jobs.
for example, if i want to write script which monitor if the job is up or not and if the job is down it will resume the job from the externalized checkpoint.
how could i know which checkpoint belong to the specific job?

can I configure each job to write the external check-pointing to a different position?

my configuration is:
state.backend: rocksdb
state.backend.fs.checkpointdirs3a://flink-bucket/backend/checkpoints
state.checkpoints.dirs3a://flink-bucket/checkpoints

and in the code I set:
enableCheckpointing
enableExternalizedCheckpoints

Reply | Threaded
Open this post in threaded view
|

Re: external checkpoints

aviad
Hi,

thanks for the answer.
I can use the first option (REST API).
for some reason it is undocumented in flink documentation
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html)

regarding the second option, configure each job with an externalized
checkpoint directory.
I don't see how I can do it.
from flink documentation:
"The target directory for the externalized checkpoint’s meta data is
determined from the configuration key state.checkpoints.dir which,
currently, *can only be set via the configuration files.*"

do I messing something?
do I have another way to configure externalized checkpoint?

thanks
Aviad



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: external checkpoints

Fabian Hueske-2
Hi Aviad,

sorry for the late reply.
You can configure the checkpoint directory (which is also used for externalized checkpoints) when you create the state backend:

env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");

This configures the checkpoint directory to be hdfs:///checkpoints-data.

Best, Fabian

2017-11-16 9:26 GMT+01:00 aviad <[hidden email]>:
Hi,

thanks for the answer.
I can use the first option (REST API).
for some reason it is undocumented in flink documentation
(https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html)

regarding the second option, configure each job with an externalized
checkpoint directory.
I don't see how I can do it.
from flink documentation:
"The target directory for the externalized checkpoint’s meta data is
determined from the configuration key state.checkpoints.dir which,
currently, *can only be set via the configuration files.*"

do I messing something?
do I have another way to configure externalized checkpoint?

thanks
Aviad



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply | Threaded
Open this post in threaded view
|

Re: external checkpoints

Aljoscha Krettek
Hi,

I think we might have a slight misunderstanding here. In most situations you do in fact have to configure two directories:
 - the directory where the metadata for externalised checkpoints is stored, this would be "state.checkpoints.dir"
 - the directory where the actual checkpoint data should be stored, this is the constructor argument for state backends

As you noticed, you can only configure the former in the configuration file, which makes it hard to work with multiple jobs from the same Flink directory. If you are running your jobs via YARN you can use "-yD" options to configure those settings per each job. This is an example invocation that I use:

HADOOP_CONF_DIR=/etc/hadoop/conf bin/flink run -c com.dataartisans.flink.example.eventpattern.DataGeneratorJob  \
-m yarn-cluster -yn 2 -ys 1 -ytm 4000 \
-yD yarn.maximum-failed-containers=100000000 -yD yarn.application-attempts=100000000 \
-yD high-availability=zookeeper \
-yD high-availability.zookeeper.quorum=some-ip:2181 \
-yD high-availability.zookeeper.storageDir=s3://aljoscha/data-generator-ha \
-yD high-availability.zookeeper.path.root=/flink-data-generator \
-yD state.checkpoints.dir=s3://aljoscha/data-generator-external-checkpoints \
../flink-state-machine-kafka011-1.0-SNAPSHOT.jar --parallelism 2 --topic events6 --bootstrap.servers some-other-ip:9092 --numKeys 10000000 --sleep 1 --checkpointDir s3://aljoscha/data-generator-11-checkpoints --externalizedCheckpoints true

This is not a good situation and there are plans for fixing this for 1.5.

Best,
Aljoscha


> On 24. Nov 2017, at 11:39, Fabian Hueske <[hidden email]> wrote:
>
> Hi Aviad,
>
> sorry for the late reply.
> You can configure the checkpoint directory (which is also used for externalized checkpoints) when you create the state backend:
>
> env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
>
> This configures the checkpoint directory to be hdfs:///checkpoints-data.
>
> Best, Fabian
>
> 2017-11-16 9:26 GMT+01:00 aviad <[hidden email]>:
> Hi,
>
> thanks for the answer.
> I can use the first option (REST API).
> for some reason it is undocumented in flink documentation
> (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html)
>
> regarding the second option, configure each job with an externalized
> checkpoint directory.
> I don't see how I can do it.
> from flink documentation:
> "The target directory for the externalized checkpoint’s meta data is
> determined from the configuration key state.checkpoints.dir which,
> currently, *can only be set via the configuration files.*"
>
> do I messing something?
> do I have another way to configure externalized checkpoint?
>
> thanks
> Aviad
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>