"Close()" aborts last transaction in TwoPhaseCommitSinkFunction

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

"Close()" aborts last transaction in TwoPhaseCommitSinkFunction

Niels van Kaam
Hi,

I'm working on a custom implementation of a sink which I would like to use with exactly once semantics. Therefore I have implemented the TwoPhaseCommitSinkFunction class as mentioned in this recent post: https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

I have some integration tests which run jobs using the custom sink with a finite dataset (A RichSourceFunction with a "finite" run method). The tests fail because of missing data. I noticed that is due to the last transaction being aborted.

When looking into the source code that makes sense because the close() implementation of TwoPhaseCommitSinkFunction calls abort on the current transaction: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java 

I could override this behaviour and perform a commit, but then I would perform a commit without getting the checkpoint completed notification, thus not properly maintaining exactly once guarantees

Is (and how is) it possible to have end-to-end exactly once guarantees when dealing with (sometimes) finite jobs?

Thanks!
Niels

Reply | Threaded
Open this post in threaded view
|

Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction

Piotr Nowojski
Hi,

Short answer is: no, at the moment clean shutdown is not implemented for the streaming, but it’s on our to do list for the future.

Hacky answer: you could implement some custom code, that would wait for at least one completed checkpoint after the last input data. But that would require modifying a source function or at least wrapping it and there might be some corner cases that I haven’t thought about.

Piotrek

On 9 Mar 2018, at 14:49, Niels van Kaam <[hidden email]> wrote:

Hi,

I'm working on a custom implementation of a sink which I would like to use with exactly once semantics. Therefore I have implemented the TwoPhaseCommitSinkFunction class as mentioned in this recent post: https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

I have some integration tests which run jobs using the custom sink with a finite dataset (A RichSourceFunction with a "finite" run method). The tests fail because of missing data. I noticed that is due to the last transaction being aborted.

When looking into the source code that makes sense because the close() implementation of TwoPhaseCommitSinkFunction calls abort on the current transaction: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java 

I could override this behaviour and perform a commit, but then I would perform a commit without getting the checkpoint completed notification, thus not properly maintaining exactly once guarantees

Is (and how is) it possible to have end-to-end exactly once guarantees when dealing with (sometimes) finite jobs?

Thanks!
Niels


Reply | Threaded
Open this post in threaded view
|

Re: "Close()" aborts last transaction in TwoPhaseCommitSinkFunction

Niels van Kaam
Thank you! I already have a custom source function so adding the hacky solution would not be too much effort.

Looking forward to the "proper" solution!

Niels

On Fri, Mar 9, 2018, 16:00 Piotr Nowojski <[hidden email]> wrote:
Hi,

Short answer is: no, at the moment clean shutdown is not implemented for the streaming, but it’s on our to do list for the future.

Hacky answer: you could implement some custom code, that would wait for at least one completed checkpoint after the last input data. But that would require modifying a source function or at least wrapping it and there might be some corner cases that I haven’t thought about.

Piotrek


On 9 Mar 2018, at 14:49, Niels van Kaam <[hidden email]> wrote:

Hi,

I'm working on a custom implementation of a sink which I would like to use with exactly once semantics. Therefore I have implemented the TwoPhaseCommitSinkFunction class as mentioned in this recent post: https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

I have some integration tests which run jobs using the custom sink with a finite dataset (A RichSourceFunction with a "finite" run method). The tests fail because of missing data. I noticed that is due to the last transaction being aborted.

When looking into the source code that makes sense because the close() implementation of TwoPhaseCommitSinkFunction calls abort on the current transaction: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.java 

I could override this behaviour and perform a commit, but then I would perform a commit without getting the checkpoint completed notification, thus not properly maintaining exactly once guarantees

Is (and how is) it possible to have end-to-end exactly once guarantees when dealing with (sometimes) finite jobs?

Thanks!
Niels