Note that the definition of "exactly-once" means that records are guaranteed to be processed exactly once by Flink operators, and thus state updates to operator state happen exactly once (e.g., if C had a counter that x1, x2, and x3 incremented, the counter would have a value of 3 and not a value of 6). This is not specific to Flink, but the most accepted definition, and applicable to all stream processing systems. The reason is that the stream processor cannot by itself guarantee what happens to the outside world (the outside world is in this case the data sink).See the docs (https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html):"Apache Flink offers a fault tolerance mechanism to consistently recover the state of data streaming applications. The mechanism ensures that even in the presence of failures, the program’s state will eventually reflect every record from the data stream exactly once."Guaranteeing exactly once delivery to the sink is possible, as Marton above suggests, but the sink implementation needs to be aware and take part in the checkpointing mechanism.On Thu, Aug 27, 2015 at 1:14 PM, Márton Balassi <[hidden email]> wrote:Dear Zhangrucong,From your explanation it seems that you have a good general understanding of Flink's checkpointing algorithm. Your concern is valid, by default a sink C with emits tuples to the "outside world" potentially multiple times. A neat trick to solve this issue for your user defined sinks is to use the CheckpointNotifier interface to output records only after the corresponding checkpoint has been totally processed by the system, so sinks can also provid exactly once guarantees in Flink.This would mean that your SinkFunction has to implement both the Checkpointed and the CheckpointNotifier interfaces. The idea is to mark the output tuples with the correspoding checkpoint id, so then they can be emitted in a "consistent" manner when the checkpoint is globally acknowledged by the system. You buffer your output records in a collection of your choice and whenever a snapshotState of the Checkpointed interface is invoked you mark your fresh output records with the current checkpointID. Whenever the notifyCheckpointComplete is invoked you emit records with the corresponding ID.Note that this adds latency to your processing and as you potentially need to checkpoint a lot of data in the sinks I would recommend to use a HDFS as a state backend instead of the default solution.Best,MartonOn Thu, Aug 27, 2015 at 12:32 PM, Zhangrucong <[hidden email]> wrote:Hi:
The document said Flink can guarantee processing each tuple exactly-once, but I can not understand how it works.
For example, In Fig 1, C is running between snapshot n-1 and snapshot n(snapshot n hasn’t been generated). After snapshot n-1, C has processed tuple x1, x2, x3 and already outputted to user, then C failed and it recoveries from snapshot n-1. In my opinion, x1, x2, x3 will be processed and outputted to user again. My question is how Flink guarantee x1,x2,x3 are processed and outputted to user only once?
Fig 1.
Thanks for answing.
Free forum by Nabble | Edit this page |