Hi,
It is my understanding that the exactly-once semantics regarding the input from Kafka is based on the checkpointing in the source component retaining the offset where it was at the checkpoint moment. My question is how does that work for a sink? How can I make sure that (in light of failures) each message that is read from Kafka (my input) is written to Kafka (my output) exactly once? Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi Niels! In general, exactly once output requires transactional cooperation from the target system. Kafka has that on the roadmap, we should be able to integrate that once it is out. That means output is "committed" upon completed checkpoints, which guarantees nothing is written multiple times. Chesnay is working on an interesting prototype as a generic solution (also for Kafka, while they don't have that feature): It buffers the data in the sink persistently (using the fault tolerance state backends) and pushes the results out on notification of a completed checkpoint. That gives you exactly once semantics, but involves an extra materialization of the data. I think that there is actually a fundamental latency issue with "exactly once sinks", no matter how you implement them in any systems: You can only commit once you are sure that everything went well, to a specific point where you are sure no replay will ever be needed. So the latency in Flink for an exactly-once output would be at least the checkpoint interval. I'm eager to hear your thoughts on this. Greetings, Stephan On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <[hidden email]> wrote:
|
Hello,
> I think that there is actually a fundamental latency issue with > "exactly once sinks", no matter how you implement them in any systems: > You can only commit once you are sure that everything went well, > to a specific point where you are sure no replay will ever be needed. What if the persistent buffer in the sink would be used to determine which data elements should be emitted in case of a replay? I mean, the sink pushes everything as soon as it arrives, and also writes everything to the persistent buffer, and then in case of a replay it looks into the buffer before pushing every element, and only does the push if the buffer says that the element was not pushed before. Best, Gábor 2016-02-05 11:57 GMT+01:00 Stephan Ewen <[hidden email]>: > Hi Niels! > > In general, exactly once output requires transactional cooperation from the > target system. Kafka has that on the roadmap, we should be able to integrate > that once it is out. > That means output is "committed" upon completed checkpoints, which > guarantees nothing is written multiple times. > > Chesnay is working on an interesting prototype as a generic solution (also > for Kafka, while they don't have that feature): > It buffers the data in the sink persistently (using the fault tolerance > state backends) and pushes the results out on notification of a completed > checkpoint. > That gives you exactly once semantics, but involves an extra materialization > of the data. > > > I think that there is actually a fundamental latency issue with "exactly > once sinks", no matter how you implement them in any systems: > You can only commit once you are sure that everything went well, to a > specific point where you are sure no replay will ever be needed. > > So the latency in Flink for an exactly-once output would be at least the > checkpoint interval. > > I'm eager to hear your thoughts on this. > > Greetings, > Stephan > > > On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <[hidden email]> wrote: >> >> Hi, >> >> It is my understanding that the exactly-once semantics regarding the input >> from Kafka is based on the checkpointing in the source component retaining >> the offset where it was at the checkpoint moment. >> >> My question is how does that work for a sink? How can I make sure that (in >> light of failures) each message that is read from Kafka (my input) is >> written to Kafka (my output) exactly once? >> >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes > > |
@Gabor: That assumes deterministic streams and to some extend deterministic tuple order. That may be given sometimes, but it is a very strong assumption in many cases. On Fri, Feb 5, 2016 at 1:09 PM, Gábor Gévay <[hidden email]> wrote: Hello, |
In reply to this post by Gábor Gévay
Hi Gabor,
The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario: 1) sink pushes to kafka record k and updates local buffer for k 2) sink snapshots k and the rest of its state on checkpoint barrier 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer Chesnay’s approach seems to be valid and best effort for the time being. Paris > On 05 Feb 2016, at 13:09, Gábor Gévay <[hidden email]> wrote: > > Hello, > >> I think that there is actually a fundamental latency issue with >> "exactly once sinks", no matter how you implement them in any systems: >> You can only commit once you are sure that everything went well, >> to a specific point where you are sure no replay will ever be needed. > > What if the persistent buffer in the sink would be used to determine > which data elements should be emitted in case of a replay? I mean, the > sink pushes everything as soon as it arrives, and also writes > everything to the persistent buffer, and then in case of a replay it > looks into the buffer before pushing every element, and only does the > push if the buffer says that the element was not pushed before. > > Best, > Gábor > > > 2016-02-05 11:57 GMT+01:00 Stephan Ewen <[hidden email]>: >> Hi Niels! >> >> In general, exactly once output requires transactional cooperation from the >> target system. Kafka has that on the roadmap, we should be able to integrate >> that once it is out. >> That means output is "committed" upon completed checkpoints, which >> guarantees nothing is written multiple times. >> >> Chesnay is working on an interesting prototype as a generic solution (also >> for Kafka, while they don't have that feature): >> It buffers the data in the sink persistently (using the fault tolerance >> state backends) and pushes the results out on notification of a completed >> checkpoint. >> That gives you exactly once semantics, but involves an extra materialization >> of the data. >> >> >> I think that there is actually a fundamental latency issue with "exactly >> once sinks", no matter how you implement them in any systems: >> You can only commit once you are sure that everything went well, to a >> specific point where you are sure no replay will ever be needed. >> >> So the latency in Flink for an exactly-once output would be at least the >> checkpoint interval. >> >> I'm eager to hear your thoughts on this. >> >> Greetings, >> Stephan >> >> >> On Fri, Feb 5, 2016 at 11:17 AM, Niels Basjes <[hidden email]> wrote: >>> >>> Hi, >>> >>> It is my understanding that the exactly-once semantics regarding the input >>> from Kafka is based on the checkpointing in the source component retaining >>> the offset where it was at the checkpoint moment. >>> >>> My question is how does that work for a sink? How can I make sure that (in >>> light of failures) each message that is read from Kafka (my input) is >>> written to Kafka (my output) exactly once? >>> >>> >>> -- >>> Best regards / Met vriendelijke groeten, >>> >>> Niels Basjes >> >> |
> On 05 Feb 2016, at 13:28, Paris Carbone <[hidden email]> wrote: > > Hi Gabor, > > The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario: > > 1) sink pushes to kafka record k and updates local buffer for k > 2) sink snapshots k and the rest of its state on checkpoint barrier > 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted > 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer > > Chesnay’s approach seems to be valid and best effort for the time being. Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link? – Ufuk |
That would be good indeed. I just learned about it from Stephan mentioned. It sounds correct to me along the lines but it would be nice to see the details.
> On 05 Feb 2016, at 13:32, Ufuk Celebi <[hidden email]> wrote: > > >> On 05 Feb 2016, at 13:28, Paris Carbone <[hidden email]> wrote: >> >> Hi Gabor, >> >> The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario: >> >> 1) sink pushes to kafka record k and updates local buffer for k >> 2) sink snapshots k and the rest of its state on checkpoint barrier >> 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted >> 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer >> >> Chesnay’s approach seems to be valid and best effort for the time being. > > Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link? > > – Ufuk > |
In reply to this post by Stephan Ewen
Hi, Buffering the data (in all cases) would hurt the latency so much that Flink is effectively reverting to microbatching (where batch size is checkpoint period) with regards of the output. My initial thoughts on how to solve this was as follows: 1) The output persists the ID of the last message it wrote to Kafka in the checkpoint. 2) Upon recovery the sink would 2a) Record the offset Kafka is at at that point in time 2b) For all 'new' messages validate if it must write this message by reading from Kafka (starting at the offset in the checkpoint) and if the message is already present it would skip it. 3) If a message arrives that has not yet written the message is written. Under the assumption that the messages arrive in the same order as before the sink can now simply run as normal. This way the performance is only impacted in the (short) period after the recovery of a disturbance. What do you think? Niels Basjes On Fri, Feb 5, 2016 at 11:57 AM, Stephan Ewen <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
In reply to this post by Paris Carbone
From what I understood state on sinks is included in the operator state of the sinks and pushed to kafka when 3-phase commit is complete.
i.e. when the checkpoint completion notification arrives at the sinks. There are several pitfalls I am really curious to check and see how they are (going to be) handled, this is of course not as simple as it sounds. It really depends on the guarantees and operations the outside storage gives you. For example, how can we know that the pushed records are actually persisted in kafka in a single transaction? Not as simple as it sounds. @Chesnay can you tell us more? > On 05 Feb 2016, at 13:33, Paris Carbone <[hidden email]> wrote: > > That would be good indeed. I just learned about it from Stephan mentioned. It sounds correct to me along the lines but it would be nice to see the details. > >> On 05 Feb 2016, at 13:32, Ufuk Celebi <[hidden email]> wrote: >> >> >>> On 05 Feb 2016, at 13:28, Paris Carbone <[hidden email]> wrote: >>> >>> Hi Gabor, >>> >>> The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario: >>> >>> 1) sink pushes to kafka record k and updates local buffer for k >>> 2) sink snapshots k and the rest of its state on checkpoint barrier >>> 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted >>> 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer >>> >>> Chesnay’s approach seems to be valid and best effort for the time being. >> >> Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link? >> >> – Ufuk >> > |
In reply to this post by Niels Basjes
This is not a bad take. It still makes a few assumptions
1) the output checkpoints the id of the last *known* ID that was *persisted* in kafka (not just pushed)
2) we assume deterministic tuple order, as Stephan pointed out
|
The way I imagine this is that the sink would have its "own
checkpoints" separately from the rest of the system, and with much smaller interval, and writes to Kafka (with "transactional cooperation", as Stephan mentioned) during making these checkpoints. And then when a replay happens from a global system checkpoint, it can look at its own checkpoints to decide for each tuple whether to send it or not. @Stephan: > That assumes deterministic streams and to some extend deterministic tuple order. > That may be given sometimes, but it is a very strong assumption in many cases. Ah yes, you are right. But doing everything based on event time points in this direction of deterministic streams, right? |
In reply to this post by Niels Basjes
@Niels: I don't fully understand your approach so far. If you write a message to Kafka between two checkpoints, where do you store the information that this particular message is already written (I think this would be the ID in your example). Such an information would need to be persisted for every written messages (or very small group of messages). Stephan On Fri, Feb 5, 2016 at 1:41 PM, Niels Basjes <[hidden email]> wrote:
|
In reply to this post by Paris Carbone
Essentially what happens is the
following:
in between checkpoints all incoming data is stored within the operator state. when a checkpoint-complete operation arrives, the data is read from the operator state and written into kafka (or any system) if the job fails while storing records in the state, the current state is discarded and we go back to the previous one. since no data was written yet, we fulfill exactly-once here. if the job fails while data is being written into cassandra (it can't be written as one atomic action) , some data will persist in cassandra, and will be send again upon restart. in this case exactly-once is not fulfilled. But we minimize the time-frame in which a failure causes exactly-once to fail, which is pretty much as close as you can get without support from kafka or others. @Niels we discussed having a counter that tells us how much data was written in between checkpoints. But this is currently not possible, an operator can't update his state on the fly, so we would need something new here. And there would still be cases where even this would fail, for example if the job fails after the message was sent, but before the ID was saved. On 05.02.2016 13:55, Paris Carbone
wrote:
On 05.02.2016 13:49, Paris Carbone wrote: From what I understood state on sinks is included in the operator state of the sinks and pushed to kafka when 3-phase commit is complete. i.e. when the checkpoint completion notification arrives at the sinks. There are several pitfalls I am really curious to check and see how they are (going to be) handled, this is of course not as simple as it sounds. It really depends on the guarantees and operations the outside storage gives you. For example, how can we know that the pushed records are actually persisted in kafka in a single transaction? Not as simple as it sounds. @Chesnay can you tell us more?On 05 Feb 2016, at 13:33, Paris Carbone [hidden email] wrote: That would be good indeed. I just learned about it from Stephan mentioned. It sounds correct to me along the lines but it would be nice to see the details.On 05 Feb 2016, at 13:32, Ufuk Celebi [hidden email] wrote:On 05 Feb 2016, at 13:28, Paris Carbone [hidden email] wrote: Hi Gabor, The sinks should aware that the global checkpoint is indeed persisted before emitting so they will have to wait until they are notified for its completion before pushing to Kafka. The current view of the local state is not the actual persisted view so checking against is like relying on dirty state. Imagine the following scenario: 1) sink pushes to kafka record k and updates local buffer for k 2) sink snapshots k and the rest of its state on checkpoint barrier 3) global checkpoint fails due to some reason (e.g. another sink subtask failed) and the job gets restarted 4) sink pushes again record k to kafka since the last global snapshots did not complete before and k is not in the local buffer Chesnay’s approach seems to be valid and best effort for the time being.Chesnay’s approach is not part of this thread. Can you or Chesnay elaborate/provide a link? – Ufuk |
In reply to this post by Stephan Ewen
@Stephan; Kafka keeps the messages for a configured TTL (i.e. a few days/weeks). So my idea is based on the fact that Kafka has all the messages and that I can read those messages from Kafka to validate if I should or should not write them again. Let me illustrate what I had in mind: I write messages to Kafka and at the moment of the checkpoint the last message ID I wrote is 5. Then I write 6,7,8 FAIL Recover: Open a reader starting at message 5 Get message 6 -> Read from Kafka --> Already have this --> Skip Get message 7 -> Read from Kafka --> Already have this --> Skip Get message 8 -> Read from Kafka --> Already have this --> Skip Get message 9 -> Read from Kafka --> Not yet in Kafka --> Write and resume normal operations. Like I said: This is just the first rough idea I had on a possible direction how this can be solved without the latency impact of buffering. Niels Basjes On Fri, Feb 5, 2016 at 2:06 PM, Stephan Ewen <[hidden email]> wrote:
Best regards / Met vriendelijke groeten,
Niels Basjes |
Hi Niels! That could actually work, given a way to identify messages with a unique ID. Would be quite an exercise to implement... Stephan On Fri, Feb 5, 2016 at 2:14 PM, Niels Basjes <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |