Hi,
I have several jobs which configured for external check-pointing (enableExternalizedCheckpoints) how can I correlate between checkpoint and jobs. for example, if i want to write script which monitor if the job is up or not and if the job is down it will resume the job from the externalized checkpoint. how could i know which checkpoint belong to the specific job? can I configure each job to write the external check-pointing to a different position? my configuration is: state.backend: rocksdb state.backend.fs.checkpointdir: s3a://flink-bucket/backend/checkpoints state.checkpoints.dir: s3a://flink-bucket/checkpoints and in the code I set: enableCheckpointing enableExternalizedCheckpoints |
Hi Aviad,
I had a similar situation and my solution was to use the flink monitoring rest api (/jobs/{jobid}/checkpoints) to get the mapping between job and checkpoint file. Wrap this in a script and run periodically( in my case, it was 30 sec). You can also configure each job with an externalized checkpoint directory. Refer https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/checkpoints.html#directory-structure Thanks, Jins On 11/15/2017 06:34 AM, Aviad Rotem
wrote:
|
Hi,
thanks for the answer. I can use the first option (REST API). for some reason it is undocumented in flink documentation (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html) regarding the second option, configure each job with an externalized checkpoint directory. I don't see how I can do it. from flink documentation: "The target directory for the externalized checkpoint’s meta data is determined from the configuration key state.checkpoints.dir which, currently, *can only be set via the configuration files.*" do I messing something? do I have another way to configure externalized checkpoint? thanks Aviad -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ |
Hi Aviad, sorry for the late reply. You can configure the checkpoint directory (which is also used for externalized checkpoints) when you create the state backend: env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"); This configures the checkpoint directory to be hdfs:///checkpoints-data. Best, Fabian 2017-11-16 9:26 GMT+01:00 aviad <[hidden email]>: Hi, |
Hi,
I think we might have a slight misunderstanding here. In most situations you do in fact have to configure two directories: - the directory where the metadata for externalised checkpoints is stored, this would be "state.checkpoints.dir" - the directory where the actual checkpoint data should be stored, this is the constructor argument for state backends As you noticed, you can only configure the former in the configuration file, which makes it hard to work with multiple jobs from the same Flink directory. If you are running your jobs via YARN you can use "-yD" options to configure those settings per each job. This is an example invocation that I use: HADOOP_CONF_DIR=/etc/hadoop/conf bin/flink run -c com.dataartisans.flink.example.eventpattern.DataGeneratorJob \ -m yarn-cluster -yn 2 -ys 1 -ytm 4000 \ -yD yarn.maximum-failed-containers=100000000 -yD yarn.application-attempts=100000000 \ -yD high-availability=zookeeper \ -yD high-availability.zookeeper.quorum=some-ip:2181 \ -yD high-availability.zookeeper.storageDir=s3://aljoscha/data-generator-ha \ -yD high-availability.zookeeper.path.root=/flink-data-generator \ -yD state.checkpoints.dir=s3://aljoscha/data-generator-external-checkpoints \ ../flink-state-machine-kafka011-1.0-SNAPSHOT.jar --parallelism 2 --topic events6 --bootstrap.servers some-other-ip:9092 --numKeys 10000000 --sleep 1 --checkpointDir s3://aljoscha/data-generator-11-checkpoints --externalizedCheckpoints true This is not a good situation and there are plans for fixing this for 1.5. Best, Aljoscha > On 24. Nov 2017, at 11:39, Fabian Hueske <[hidden email]> wrote: > > Hi Aviad, > > sorry for the late reply. > You can configure the checkpoint directory (which is also used for externalized checkpoints) when you create the state backend: > > env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"); > > This configures the checkpoint directory to be hdfs:///checkpoints-data. > > Best, Fabian > > 2017-11-16 9:26 GMT+01:00 aviad <[hidden email]>: > Hi, > > thanks for the answer. > I can use the first option (REST API). > for some reason it is undocumented in flink documentation > (https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html) > > regarding the second option, configure each job with an externalized > checkpoint directory. > I don't see how I can do it. > from flink documentation: > "The target directory for the externalized checkpoint’s meta data is > determined from the configuration key state.checkpoints.dir which, > currently, *can only be set via the configuration files.*" > > do I messing something? > do I have another way to configure externalized checkpoint? > > thanks > Aviad > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > |
Free forum by Nabble | Edit this page |