Hi, I was looking into the flink snapshotting algorithm details also mentioned here:http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/ https://blog.acolyer.org/2015/08/19/asynchronous-distributed-snapshots-for-distributed-dataflows/ http://mail-archives.apache.org/mod_mbox/flink-user/201601.mbox/%3CCANC1h_s6MCWSuDf2zSnEeD66LszDoLx0jt64++0kBOKTjkAv7w%40mail.gmail.com%3E http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/About-exactly-once-question-td2545.html From other sources i understand that it assumes no failures to work for message delivery or for example a process hanging for ever: https://en.wikipedia.org/wiki/Snapshot_algorithm https://blog.acolyer.org/2015/04/22/distributed-snapshots-determining-global-states-of-distributed-systems/ So my understanding (maybe wrong) is that this is a solution which seems not to address the fault tolerance issue in a strong manner like for example if it was to use a 3pc protocol for local state propagation and global agreement. I know the latter is not efficient just mentioning it for comparison. How the algorithm behaves in practical terms under the presence of its own failures (this is a background process collecting partial states)? Are there timeouts for reaching a barrier? PS. have not looked deep into the code details yet, planning to. |
Hi Stravos, I haven't implemented our checkpointing mechanism and I didn't participate in the design decisions while implementing it, so I can not compare it in detail to other approaches. From a "does it work perspective": Checkpoints are only confirmed if all parallel subtasks successfully created a valid snapshot of the state. So if there is a failure in the checkpointing mechanism, no valid checkpoint will be created. The system will recover from the last valid checkpoint. There is a timeout for checkpoints. So if a barrier doesn't pass through the system for a certain period of time, the checkpoint is cancelled. The default timeout is 10 minutes. Regards, Robert On Mon, May 16, 2016 at 1:22 PM, Stavros Kontopoulos <[hidden email]> wrote:
|
Cool thnx. So if a checkpoint expires the pipeline will block or fail in total or only the specific task related to the operator (running along with the checkpoint task) or nothing happens? On Tue, May 17, 2016 at 3:49 PM, Robert Metzger <[hidden email]> wrote:
|
Hi Stavros,
Currently, rollback failure recovery in Flink works in the pipeline level, not in the task level (see Millwheel [1]). It further builds on repayable stream logs (i.e. Kafka), thus, there is no need for 3pc or backup in the pipeline sources. You
can also check this presentation [2] which explains the basic concepts more in detail I hope. Mind that many upcoming optimisation opportunities are going to be addressed in the not so long-term Flink roadmap.
Paris
|
Regarding your last question,
If a checkpoint expires it just gets invalidated and a new complete checkpoint will eventually occur that can be used for recovery. If I am wrong, or something has changed please correct me.
Paris
|
In reply to this post by Paris Carbone
Hey thnx for the links. There are assumptions though like reliable channels... since you rely on tcp in practice and if a checkpoint fails or is very slow then you need to deal with it.... thats why i asked previously what happens then.. 3cp does not need assumptions i think, but engineering is more practical (it should be) and a different story in general. The [2] mentions also the assumptions... Best, On Thu, May 19, 2016 at 9:14 PM, Paris Carbone <[hidden email]> wrote:
|
In reply to this post by Paris Carbone
"Checkpoints are only confirmed if all parallel subtasks successfully created a valid snapshot of the state." as stated by Robert. So to rephrase my question... how confirmation that all snapshots are finished is done and what happens if some task is very slow...or is blocked? If you have N tasks confirmed and one missing what do you do? You start a new checkpoint for that one? or a global new checkpoint for the rest of N tasks as well?On Thu, May 19, 2016 at 9:21 PM, Paris Carbone <[hidden email]> wrote:
|
In reply to this post by Stavros Kontopoulos
True, if you like formal modelling and stuff like that you can think of it as a more relaxed/abortable operation (e.g. like abortable consensus) which yields the same guarantees and works ok in semi-synchronous distributed systems (as in the case of Flink).
|
In reply to this post by Stavros Kontopoulos
In that case, typically a timeout invalidates the whole snapshot (all states for the same epoch) until eventually we have a full complete snapshot.
|
Yes thats what i was thinking thnx. When people here exactly once they
think are you sure, there is something hidden there... because theory is
theory :) So if you keep getting invalidated snapshots but data passes through operators you issue a warning or fail the pipeline and return an exception to the driver?On Thu, May 19, 2016 at 9:30 PM, Paris Carbone <[hidden email]> wrote:
|
Invalidations are not necessarily exposed (I hope). Think of it as implementing TCP, you don’t have to warn the user that packets are lost since eventually a packet will be received at the other side in an eventually sunchronous system. Snapshots follow the
same paradigm. Hope that helps.
|
In reply to this post by Paris Carbone
I was wondering how checkpoints can be async? Because your state is constantly mutating. You probably need versioned state, or immutable data structs?
-Abhishek-
|
Hi Abhishek,
I don’t see the problem there (also this is unrelated to the snapshotting protocol).
Intuitively, if you submit a copy of your state (full or delta) for a snapshot version/epoch to a store backend and validate the full snapshot for that version when you eventually receive the acknowledgements this still works fine. Am I missing
something?
|
In reply to this post by abhishekrs
The problem here is different though if something is keep failing (permanently) in practice someone needs to be notified. If the user loses snapshotting he must know. On Thu, May 19, 2016 at 9:36 PM, Abhishek R. Singh <[hidden email]> wrote:
|
Sure, in practice you can set a threshold of retries since an operator implementation could cause this indefinitely or any other reason can make snapshotting generally infeasible. If I recall correctly that threshold exists in the Flink configuration.
|
In reply to this post by Paris Carbone
If you can take atomic in-memory copies, then it works (at the cost of doubling your instantaneous memory). For larger state (say rocks DB), won’t you have to stop the world (atomic snapshot) and make a copy? Doesn’t that make it synchronous, instead of background/async?
Sorry Stravros - for bumping into your thread. This should probably have been a new thread (I changed the subject in an attempt to fix up). -Abhishek-
|
In reply to this post by Paris Carbone
Cool thnx Paris. On Thu, May 19, 2016 at 9:48 PM, Paris Carbone <[hidden email]> wrote:
|
In reply to this post by abhishekrs
No problem ;) On Thu, May 19, 2016 at 9:54 PM, Abhishek R. Singh <[hidden email]> wrote:
|
In reply to this post by abhishekrs
On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh
<[hidden email]> wrote: > If you can take atomic in-memory copies, then it works (at the cost of > doubling your instantaneous memory). For larger state (say rocks DB), won’t > you have to stop the world (atomic snapshot) and make a copy? Doesn’t that > make it synchronous, instead of background/async? Hey Abhishek, that's correct. There are two variants for RocksDB: - semi-async (default): snapshot is taking via RocksDB backup feature to backup to a directory (sync). This is then copied to the final checkpoint location (async, e.g copy to HDFS). - fully-async: snapshot is taking via RocksDB snapshot feature (sync, but no full copy and essentially "free"). With this snapshot we iterate over all k/v-pairs and copy them to the final checkpoint location (async, e.g. copy to HDFS). You enable the second variant via: rocksDbBackend.enableFullyAsyncSnapshots(); This is only part of the 1.1-SNAPSHOT version though. I'm not too familiar with the performance characteristics of both variants, but maybe Aljoscha can chime in. Does this clarify things for you? – Ufuk |
Yes. Thanks for explaining.
On Friday, May 20, 2016, Ufuk Celebi <[hidden email]> wrote: On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh |
Free forum by Nabble | Edit this page |