Fault tolerance & idempotency on window functions

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Fault tolerance & idempotency on window functions

Kamil Dziublinski
Hi guys,

I have a flink streaming job that reads from kafka, creates some statistics increments and stores this in hbase (using normal puts).
I'm using fold function here of with window of few seconds.

My tests showed me that restoring state with window functions is not exactly working how I expected.
I thought that if my window functions emits an aggregated object to a sink, and that object fails in a sink, this write to hbase will be replayed. So even if it actually got written to HBase, but flink thought it didnt (for instance during network problem) I could be sure of idempotent writes. I wanted to enforce that by using the timestamp of the first event used in that window for aggregation. 

Now correct me if I'm wrong but it seems that in the case of failure (even if its in sink) whole flow is getting replayed from last checkpoint which means that my window function might evict aggregated object in a different form. For instance not only having tuples that failed but also other ones, which would break my idempotency her and I might end up with having higher counters than I should have.

Do you have any suggestion on how to solve/workaround such problem in flink?

Thanks,
Kamil.


Reply | Threaded
Open this post in threaded view
|

Re: Fault tolerance & idempotency on window functions

Aljoscha Krettek
Hi,
Yes, your analysis is correct: Flink will not retry for individual elements but will restore from the latest consistent checkpoint in case of failure. This also means that you can get different window results based on which element arrives first, i.e. you have a different timestamp on your output in that case.

One simple mitigation for the timestamp problem is to use the largest timestamp of elements within a window instead of the first timestamp. This will be stable across restores even if the order of arrival of elements changes. You can still get problems when it comes to late data and window triggering, if you cannot guarantee that your watermark is 100 % correct, though. I.e. it might be that, upon restore, an element with an even larger timestamp arrives late that was not considered when doing the first processing that failed.

Best,
Aljoscha

> On 25. Apr 2017, at 19:54, Kamil Dziublinski <[hidden email]> wrote:
>
> Hi guys,
>
> I have a flink streaming job that reads from kafka, creates some statistics increments and stores this in hbase (using normal puts).
> I'm using fold function here of with window of few seconds.
>
> My tests showed me that restoring state with window functions is not exactly working how I expected.
> I thought that if my window functions emits an aggregated object to a sink, and that object fails in a sink, this write to hbase will be replayed. So even if it actually got written to HBase, but flink thought it didnt (for instance during network problem) I could be sure of idempotent writes. I wanted to enforce that by using the timestamp of the first event used in that window for aggregation.
>
> Now correct me if I'm wrong but it seems that in the case of failure (even if its in sink) whole flow is getting replayed from last checkpoint which means that my window function might evict aggregated object in a different form. For instance not only having tuples that failed but also other ones, which would break my idempotency her and I might end up with having higher counters than I should have.
>
> Do you have any suggestion on how to solve/workaround such problem in flink?
>
> Thanks,
> Kamil.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Fault tolerance & idempotency on window functions

Kamil Dziublinski
Big thanks for replying Aljoscha, I spend quite some time on thinking how to solve this problem and came to some conclusions. Would be cool if you can verify if my logic is correct.

I decided that if I will partition data in kafka in the same way as I partition my window with keyby. It's tenant, user combination (I would still use hash out of it in kafka producer) and I will switch processing to event time (currently it was processing time) then during replay I could be 100% sure that first element will always be first, and watermark for triggering the window would also come at the same moment. This giving me idempotent writes of this batched object to HBase.

And for late events (by configuring lateness on the window itself) I would configure the trigger to fire & purge, so that it doesn't hold fired data. This way if late event arrives I could fire this late event with a different timestamp treating it in hbase as totally separate increment, not overriding my previous data. 
The reason I want to purge data here on firing, is cause I would need to have allowed lateness on window of at least 2 months. So holding all data after firing for 2 months would be too costly.
Additional question here, is there any cost to having allowed lateness very high (like 2 months) if we configure trigger to fire & purge. Like any additional state or metadata that flinks need to maintain that would take much memory from the cluster? Would I have to consider rocksdb here for state or FS state could still work?

On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
Yes, your analysis is correct: Flink will not retry for individual elements but will restore from the latest consistent checkpoint in case of failure. This also means that you can get different window results based on which element arrives first, i.e. you have a different timestamp on your output in that case.

One simple mitigation for the timestamp problem is to use the largest timestamp of elements within a window instead of the first timestamp. This will be stable across restores even if the order of arrival of elements changes. You can still get problems when it comes to late data and window triggering, if you cannot guarantee that your watermark is 100 % correct, though. I.e. it might be that, upon restore, an element with an even larger timestamp arrives late that was not considered when doing the first processing that failed.

Best,
Aljoscha
> On 25. Apr 2017, at 19:54, Kamil Dziublinski <[hidden email]> wrote:
>
> Hi guys,
>
> I have a flink streaming job that reads from kafka, creates some statistics increments and stores this in hbase (using normal puts).
> I'm using fold function here of with window of few seconds.
>
> My tests showed me that restoring state with window functions is not exactly working how I expected.
> I thought that if my window functions emits an aggregated object to a sink, and that object fails in a sink, this write to hbase will be replayed. So even if it actually got written to HBase, but flink thought it didnt (for instance during network problem) I could be sure of idempotent writes. I wanted to enforce that by using the timestamp of the first event used in that window for aggregation.
>
> Now correct me if I'm wrong but it seems that in the case of failure (even if its in sink) whole flow is getting replayed from last checkpoint which means that my window function might evict aggregated object in a different form. For instance not only having tuples that failed but also other ones, which would break my idempotency her and I might end up with having higher counters than I should have.
>
> Do you have any suggestion on how to solve/workaround such problem in flink?
>
> Thanks,
> Kamil.
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Fault tolerance & idempotency on window functions

Aljoscha Krettek
Hi,
When keying, keep in mind that Kafka and Flink might use a different scheme for hashing. For example, Flink also applies a murmur hash on the hash code retrieved from the key and then has some internal logic for assigning that hash to a key group (the internal unit of key partitioning). I don’t know what Kafka does internally for hashing.

Also keep in mind that even with event time, the events are not ordered by event time. So the event that arrives first does not necessarily have the lowest timestamp. Using event-time just means that we wait for the watermark to trigger window computation.

Regarding state size, if you don’t use merging windows (for example, session windows) then the only state that is kept for a purged window is a cleanup timer that is set for “end of window + allowed lateness”. That is, the state size does not increase with increasing allowed lateness if you purge. This could still fit into the heap state backend and you don’t necessarily need to consider RocksDB.

Best,
Aljoscha
On 29. Apr 2017, at 10:19, Kamil Dziublinski <[hidden email]> wrote:

Big thanks for replying Aljoscha, I spend quite some time on thinking how to solve this problem and came to some conclusions. Would be cool if you can verify if my logic is correct.

I decided that if I will partition data in kafka in the same way as I partition my window with keyby. It's tenant, user combination (I would still use hash out of it in kafka producer) and I will switch processing to event time (currently it was processing time) then during replay I could be 100% sure that first element will always be first, and watermark for triggering the window would also come at the same moment. This giving me idempotent writes of this batched object to HBase.

And for late events (by configuring lateness on the window itself) I would configure the trigger to fire & purge, so that it doesn't hold fired data. This way if late event arrives I could fire this late event with a different timestamp treating it in hbase as totally separate increment, not overriding my previous data. 
The reason I want to purge data here on firing, is cause I would need to have allowed lateness on window of at least 2 months. So holding all data after firing for 2 months would be too costly.
Additional question here, is there any cost to having allowed lateness very high (like 2 months) if we configure trigger to fire & purge. Like any additional state or metadata that flinks need to maintain that would take much memory from the cluster? Would I have to consider rocksdb here for state or FS state could still work?

On Fri, Apr 28, 2017 at 5:54 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,
Yes, your analysis is correct: Flink will not retry for individual elements but will restore from the latest consistent checkpoint in case of failure. This also means that you can get different window results based on which element arrives first, i.e. you have a different timestamp on your output in that case.

One simple mitigation for the timestamp problem is to use the largest timestamp of elements within a window instead of the first timestamp. This will be stable across restores even if the order of arrival of elements changes. You can still get problems when it comes to late data and window triggering, if you cannot guarantee that your watermark is 100 % correct, though. I.e. it might be that, upon restore, an element with an even larger timestamp arrives late that was not considered when doing the first processing that failed.

Best,
Aljoscha
> On 25. Apr 2017, at 19:54, Kamil Dziublinski <[hidden email]> wrote:
>
> Hi guys,
>
> I have a flink streaming job that reads from kafka, creates some statistics increments and stores this in hbase (using normal puts).
> I'm using fold function here of with window of few seconds.
>
> My tests showed me that restoring state with window functions is not exactly working how I expected.
> I thought that if my window functions emits an aggregated object to a sink, and that object fails in a sink, this write to hbase will be replayed. So even if it actually got written to HBase, but flink thought it didnt (for instance during network problem) I could be sure of idempotent writes. I wanted to enforce that by using the timestamp of the first event used in that window for aggregation.
>
> Now correct me if I'm wrong but it seems that in the case of failure (even if its in sink) whole flow is getting replayed from last checkpoint which means that my window function might evict aggregated object in a different form. For instance not only having tuples that failed but also other ones, which would break my idempotency her and I might end up with having higher counters than I should have.
>
> Do you have any suggestion on how to solve/workaround such problem in flink?
>
> Thanks,
> Kamil.
>
>