Hi, I have a use case where we are reading events from kinesis stream.The event can look like this Event { event_id, transaction_id key1, key2, value, timestamp, some other fields... } We want to aggregate the values per key for all events we have seen till now (as simple as "select key1, key2, sum(value) from events group by key1, key2key."). For this I have created a simple flink job which uses flink-kinesis connector and applies keyby() and sum() on the incoming events. I am facing two challenges here: 1) The incoming events can have duplicates. How to maintain exactly once processing here, as processing duplicate events can give me erroneous result? The field transaction_id will be unique for each events. If two events have same transaction_id, we can say that they are duplicates (By duplicates here, I don't just mean the retried ones. The same message can be present in kinesis with different sequence number. I am not sure if flink-kinesis connector can handle that, as it is using KCL underlying which I assume doesn't take care of it) 2) There can be the the cases where the value has been updated for a key after processing the event and we may want to reprocess those events with new value. Since this is just a value change, the transaction_id will be same. The idempotency logic will not allow to reprocess the events. What are the ways to handle such scenarios in flink? Thanks Pooja Warm Regards, Pooja Agrawal |
Hi Pooja, I'm a bit confused since in 1) it says that "If two events have same transaction_id, we can say that they are duplicates", and in 2) it says that "Since this is just a value change, the transaction_id will be same". Looks to me they are conflicting. Usually in case 2) scenarios, the value updates event is considered as new event which does not share the unique id with prior events. If each event has a unique transaction_id, you can use it to de-duplicate the events via a set recording the transaction_id(s) which are already processed. And then 2) would not be a problem with the unique transaction_id assumption. Thanks, Zhu Zhu Pooja Agrawal <[hidden email]> 于2019年12月17日周二 下午9:17写道:
|
Hey, I am sorry for the confusion. So, the value is not already present in the event. We are reading it from a static table (kind of data enrichment in flink job). Above event is an enriched event. If we say that this is some transaction event, the user would have done the transaction once and hence the transaction_id is unique. But, the table from where we are reading the value may contain the wrong value (not always, sometimes because of bug). In this case, we may want to reprocess that transaction event with new value (here, the transaction_id will be same as previous, but the value will change). I hope this clears the scenario. Let me know if you have any other questions. To solve the idempotency problem, you suggested to maintain a set recording transaction_id(s). Since, we are aggregating over all events seen till now, the number of events and hence ids will be too large. I am assuming we will need to have some external store here and do a lookup every time we process an event. This may increase the latency. Can you suggest the efficient way to solve this? and if we need to have an external store, what will be the best candidate? Thanks Pooja On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu <[hidden email]> wrote:
Warm Regards, Pooja Agrawal |
Hi Pooja, Here's an implementation from Jamie Grier for de-duplication using in-memory cache with some expiration time: If for your use-case you can limit the time period where duplications may happen, you can use this approach. Thanks, Rafi On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal <[hidden email]> wrote:
|
Hi Pooja, My main confusion is, if 2 events have the same transaction_id, how can we tell if it is a wanted one for value updates, or it is an unwanted duplicate? MapState with a TTL can be used for deduplicating, if it is supposed that a duplicated event would not happen too late after the original event was processed. Thanks, Zhu Zhu Rafi Aroch <[hidden email]> 于2019年12月18日周三 下午3:50写道:
|
Free forum by Nabble | Edit this page |