Hi, We are using flink1.3.2 and trying to explore rocksdb state backend and checkpointing. Data source is Kafka and checkpointing enabled in Flink. We have few doubts regarding the same:
::DISCLAIMER:: ------------------------------ This message is intended only for the use of the addressee and may contain information that is privileged, confidential and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, or the employee or agent responsible for delivering the message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this e-mail in error, please notify us immediately by return e-mail and delete this e-mail and all attachments from your system. |
Hi,Ashwin
Ans: I can answer the first question you asked. Checkpoint is a mechanism that can make your program fault tolerant. Flink uses distributed snapshots implements checkpoint. But here is the question, where do I to store these states for my program ? Here is state backend comes. You can make your state to store in memory, filesystem, rocksdb. And the default is memory state backend. Please see more [1], [2] Cheers Minglei
|
Hi Ashwin, I think the questions here might be a bit general and that could make it a bit hard to offer the answer meet your expected exactly, could you please somehow bref outlined your user case here to accossiated with questions, that would definitely make it easier to offer a better help, and I would also suggest you to have a look at https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/state/ to get some information related to the state & state backend & checkpoint. However, I tried to answer the question from my understanding here, hope that could help you somehow. > What is the exact difference between checkpoint and state backend. In a nutshell(For the sake of understanding we only consider the Keyed State Backend here), you could consider the state backend as a local database in the stateful nodes to store the key-value pairs. And the checkpoint consists of the snapshot of the state backend in a distributed environment at an special time point, as @Minglei mentioned, it used for fault tolerant. When the job met a failture, it could recover the job from the latest successful checkpoint(use the state backend's snapshot to init the state backend) to continue its works without losing any data and make the job get an "exactly once result"(you could also configure the checkpoint to achive a "at least once result"). > Is the data stored in rocksdb checkpoints incremental(it keeps all past data also in newer file)? New checkpoint is created after defined interval and does it contains the previous checkpoint's data? Our use case demands all the checkpoint data to be in a single db, but when we manually restart the job it's id changes and new directory gets created(new metadata file in case of savepoints). Every checkpoint consists of a completed snapshot of the state backends in a distributed environment, which means it covers the previous state data, but its implementation could be incremental. > What data does rocksdb stores inside in case of checkpoints? We are interested in knowing whether it stores actual aggregations or it stores the offsets metadata for an aggregation window? 1. The rocksdb state backend store the key-value pairs, and the checkpoint consists of rocksdb state backend's snapshot. 2. Not very sure what you exactly means here, I would suppose you are using the follow in your job, {code} stream.keyBy(key field).window(window size).proccess(AggregationFunc()). {code} If your job could be descried as above, then the aggregation results(generated in AggregationFunc) are store in the RocksDB as the value part, its corresponding key part is the "key field"(or the "key field" + "window" if you are using the per window state). > If we run aggregations on past data, then will it take help of state backend to not run aggregations again and give results by querying the state backend, saving the processing time?
State backend is used to store the key-value pair, the aggregation logical are done by the user code. I think you could use the state to do what you expected here. Best, Sihua On 06/25/2018 09:31,[hidden email] wrote:
|
Free forum by Nabble | Edit this page |