Fwd: How to reprocess certain events in Flink?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: How to reprocess certain events in Flink?

Pooja Agrawal

Hi,

I have a use case where we are reading events from kinesis stream.The event can look like this
Event {
  event_id,
  transaction_id
  key1,
  key2,
  value,
  timestamp,
  some other fields...
}

We want to aggregate the values per key for all events we have seen till now (as simple as "select key1, key2, sum(value) from events group by key1, key2key."). For this I have created a simple flink job which uses flink-kinesis connector and applies keyby() and sum() on the incoming events. I am facing two challenges here:

1) The incoming events can have duplicates. How to maintain exactly once processing here, as processing duplicate events can give me erroneous result? The field transaction_id will be unique for each events. If two events have same transaction_id, we can say that they are duplicates (By duplicates here, I don't just mean the retried ones. The same message can be present in kinesis with different sequence number. I am not sure if flink-kinesis connector can handle that, as it is using KCL underlying which I assume doesn't take care of it)

2) There can be the the cases where the value has been updated for a key after processing the event and we may want to reprocess those events with new value. Since this is just a value change, the transaction_id will be same. The idempotency logic will not allow to reprocess the events. What are the ways to handle such scenarios in flink?

Thanks
Pooja


--
Warm Regards,
Pooja Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: How to reprocess certain events in Flink?

Zhu Zhu
Hi Pooja,

I'm a bit confused since in 1) it says that "If two events have same transaction_id, we can say that they are duplicates", and in 2) it says that "Since this is just a value change, the transaction_id will be same". Looks to me they are conflicting. Usually in case 2) scenarios, the value updates event is considered as new event which does not share the unique id with prior events.

If each event has a unique transaction_id, you can use it to de-duplicate the events via a set recording the transaction_id(s) which are already processed. And then 2) would not be a problem with the unique transaction_id assumption.

Thanks,
Zhu Zhu

Pooja Agrawal <[hidden email]> 于2019年12月17日周二 下午9:17写道:

Hi,

I have a use case where we are reading events from kinesis stream.The event can look like this
Event {
  event_id,
  transaction_id
  key1,
  key2,
  value,
  timestamp,
  some other fields...
}

We want to aggregate the values per key for all events we have seen till now (as simple as "select key1, key2, sum(value) from events group by key1, key2key."). For this I have created a simple flink job which uses flink-kinesis connector and applies keyby() and sum() on the incoming events. I am facing two challenges here:

1) The incoming events can have duplicates. How to maintain exactly once processing here, as processing duplicate events can give me erroneous result? The field transaction_id will be unique for each events. If two events have same transaction_id, we can say that they are duplicates (By duplicates here, I don't just mean the retried ones. The same message can be present in kinesis with different sequence number. I am not sure if flink-kinesis connector can handle that, as it is using KCL underlying which I assume doesn't take care of it)

2) There can be the the cases where the value has been updated for a key after processing the event and we may want to reprocess those events with new value. Since this is just a value change, the transaction_id will be same. The idempotency logic will not allow to reprocess the events. What are the ways to handle such scenarios in flink?

Thanks
Pooja


--
Warm Regards,
Pooja Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: How to reprocess certain events in Flink?

Pooja Agrawal
Hey,

I am sorry for the confusion. So, the value is not already present in the event. We are reading it from a static table (kind of data enrichment in flink job). Above event is an enriched event.
If we say that this is some transaction event, the user would have done the transaction once and hence the transaction_id is unique. But, the table from where we are reading the value may contain the wrong value (not always, sometimes because of bug). In this case, we may want to reprocess that transaction event with new value (here, the transaction_id will be same as previous, but the value will change). I hope this clears the scenario. Let me know if you have any other questions.

To solve the idempotency problem, you suggested to maintain a set recording transaction_id(s). Since, we are aggregating over all events seen till now, the number of events and hence ids will be too large. I am assuming we will need to have some external store here and do a lookup every time we process an event. This may increase the latency. Can you suggest the efficient way to solve this? and if we need to have an external store, what will be the best candidate?

Thanks
Pooja



On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu <[hidden email]> wrote:
Hi Pooja,

I'm a bit confused since in 1) it says that "If two events have same transaction_id, we can say that they are duplicates", and in 2) it says that "Since this is just a value change, the transaction_id will be same". Looks to me they are conflicting. Usually in case 2) scenarios, the value updates event is considered as new event which does not share the unique id with prior events.

If each event has a unique transaction_id, you can use it to de-duplicate the events via a set recording the transaction_id(s) which are already processed. And then 2) would not be a problem with the unique transaction_id assumption.

Thanks,
Zhu Zhu

Pooja Agrawal <[hidden email]> 于2019年12月17日周二 下午9:17写道:

Hi,

I have a use case where we are reading events from kinesis stream.The event can look like this
Event {
  event_id,
  transaction_id
  key1,
  key2,
  value,
  timestamp,
  some other fields...
}

We want to aggregate the values per key for all events we have seen till now (as simple as "select key1, key2, sum(value) from events group by key1, key2key."). For this I have created a simple flink job which uses flink-kinesis connector and applies keyby() and sum() on the incoming events. I am facing two challenges here:

1) The incoming events can have duplicates. How to maintain exactly once processing here, as processing duplicate events can give me erroneous result? The field transaction_id will be unique for each events. If two events have same transaction_id, we can say that they are duplicates (By duplicates here, I don't just mean the retried ones. The same message can be present in kinesis with different sequence number. I am not sure if flink-kinesis connector can handle that, as it is using KCL underlying which I assume doesn't take care of it)

2) There can be the the cases where the value has been updated for a key after processing the event and we may want to reprocess those events with new value. Since this is just a value change, the transaction_id will be same. The idempotency logic will not allow to reprocess the events. What are the ways to handle such scenarios in flink?

Thanks
Pooja


--
Warm Regards,
Pooja Agrawal


--
Warm Regards,
Pooja Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: How to reprocess certain events in Flink?

Rafi Aroch
Hi Pooja,

Here's an implementation from Jamie Grier for de-duplication using in-memory cache with some expiration time:

If for your use-case you can limit the time period where duplications may happen, you can use this approach.

Thanks,
Rafi


On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal <[hidden email]> wrote:
Hey,

I am sorry for the confusion. So, the value is not already present in the event. We are reading it from a static table (kind of data enrichment in flink job). Above event is an enriched event.
If we say that this is some transaction event, the user would have done the transaction once and hence the transaction_id is unique. But, the table from where we are reading the value may contain the wrong value (not always, sometimes because of bug). In this case, we may want to reprocess that transaction event with new value (here, the transaction_id will be same as previous, but the value will change). I hope this clears the scenario. Let me know if you have any other questions.

To solve the idempotency problem, you suggested to maintain a set recording transaction_id(s). Since, we are aggregating over all events seen till now, the number of events and hence ids will be too large. I am assuming we will need to have some external store here and do a lookup every time we process an event. This may increase the latency. Can you suggest the efficient way to solve this? and if we need to have an external store, what will be the best candidate?

Thanks
Pooja



On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu <[hidden email]> wrote:
Hi Pooja,

I'm a bit confused since in 1) it says that "If two events have same transaction_id, we can say that they are duplicates", and in 2) it says that "Since this is just a value change, the transaction_id will be same". Looks to me they are conflicting. Usually in case 2) scenarios, the value updates event is considered as new event which does not share the unique id with prior events.

If each event has a unique transaction_id, you can use it to de-duplicate the events via a set recording the transaction_id(s) which are already processed. And then 2) would not be a problem with the unique transaction_id assumption.

Thanks,
Zhu Zhu

Pooja Agrawal <[hidden email]> 于2019年12月17日周二 下午9:17写道:

Hi,

I have a use case where we are reading events from kinesis stream.The event can look like this
Event {
  event_id,
  transaction_id
  key1,
  key2,
  value,
  timestamp,
  some other fields...
}

We want to aggregate the values per key for all events we have seen till now (as simple as "select key1, key2, sum(value) from events group by key1, key2key."). For this I have created a simple flink job which uses flink-kinesis connector and applies keyby() and sum() on the incoming events. I am facing two challenges here:

1) The incoming events can have duplicates. How to maintain exactly once processing here, as processing duplicate events can give me erroneous result? The field transaction_id will be unique for each events. If two events have same transaction_id, we can say that they are duplicates (By duplicates here, I don't just mean the retried ones. The same message can be present in kinesis with different sequence number. I am not sure if flink-kinesis connector can handle that, as it is using KCL underlying which I assume doesn't take care of it)

2) There can be the the cases where the value has been updated for a key after processing the event and we may want to reprocess those events with new value. Since this is just a value change, the transaction_id will be same. The idempotency logic will not allow to reprocess the events. What are the ways to handle such scenarios in flink?

Thanks
Pooja


--
Warm Regards,
Pooja Agrawal


--
Warm Regards,
Pooja Agrawal
Reply | Threaded
Open this post in threaded view
|

Re: How to reprocess certain events in Flink?

Zhu Zhu
Hi Pooja,

My main confusion is, if 2 events have the same transaction_id, how can we tell if it is a wanted one for value updates, or it is an unwanted duplicate?

MapState with a TTL can be used for deduplicating, if it is supposed that a duplicated event would not happen too late after the original event was processed.

Thanks,
Zhu Zhu

Rafi Aroch <[hidden email]> 于2019年12月18日周三 下午3:50写道:
Hi Pooja,

Here's an implementation from Jamie Grier for de-duplication using in-memory cache with some expiration time:

If for your use-case you can limit the time period where duplications may happen, you can use this approach.

Thanks,
Rafi


On Wed, Dec 18, 2019 at 8:16 AM Pooja Agrawal <[hidden email]> wrote:
Hey,

I am sorry for the confusion. So, the value is not already present in the event. We are reading it from a static table (kind of data enrichment in flink job). Above event is an enriched event.
If we say that this is some transaction event, the user would have done the transaction once and hence the transaction_id is unique. But, the table from where we are reading the value may contain the wrong value (not always, sometimes because of bug). In this case, we may want to reprocess that transaction event with new value (here, the transaction_id will be same as previous, but the value will change). I hope this clears the scenario. Let me know if you have any other questions.

To solve the idempotency problem, you suggested to maintain a set recording transaction_id(s). Since, we are aggregating over all events seen till now, the number of events and hence ids will be too large. I am assuming we will need to have some external store here and do a lookup every time we process an event. This may increase the latency. Can you suggest the efficient way to solve this? and if we need to have an external store, what will be the best candidate?

Thanks
Pooja



On Wed, Dec 18, 2019 at 8:19 AM Zhu Zhu <[hidden email]> wrote:
Hi Pooja,

I'm a bit confused since in 1) it says that "If two events have same transaction_id, we can say that they are duplicates", and in 2) it says that "Since this is just a value change, the transaction_id will be same". Looks to me they are conflicting. Usually in case 2) scenarios, the value updates event is considered as new event which does not share the unique id with prior events.

If each event has a unique transaction_id, you can use it to de-duplicate the events via a set recording the transaction_id(s) which are already processed. And then 2) would not be a problem with the unique transaction_id assumption.

Thanks,
Zhu Zhu

Pooja Agrawal <[hidden email]> 于2019年12月17日周二 下午9:17写道:

Hi,

I have a use case where we are reading events from kinesis stream.The event can look like this
Event {
  event_id,
  transaction_id
  key1,
  key2,
  value,
  timestamp,
  some other fields...
}

We want to aggregate the values per key for all events we have seen till now (as simple as "select key1, key2, sum(value) from events group by key1, key2key."). For this I have created a simple flink job which uses flink-kinesis connector and applies keyby() and sum() on the incoming events. I am facing two challenges here:

1) The incoming events can have duplicates. How to maintain exactly once processing here, as processing duplicate events can give me erroneous result? The field transaction_id will be unique for each events. If two events have same transaction_id, we can say that they are duplicates (By duplicates here, I don't just mean the retried ones. The same message can be present in kinesis with different sequence number. I am not sure if flink-kinesis connector can handle that, as it is using KCL underlying which I assume doesn't take care of it)

2) There can be the the cases where the value has been updated for a key after processing the event and we may want to reprocess those events with new value. Since this is just a value change, the transaction_id will be same. The idempotency logic will not allow to reprocess the events. What are the ways to handle such scenarios in flink?

Thanks
Pooja


--
Warm Regards,
Pooja Agrawal


--
Warm Regards,
Pooja Agrawal