Handling stale data enrichment

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Handling stale data enrichment

Vinay Patil
Hi,

I went through Konstantin webinar on 99 ways you can do enrichment. One thing I am failing to understand is how do we efficiently handle stale data enrichment.

Context: Let's say I want to enrich user data with the subscription data. Here subscription data is acting as reference data and will be used for joining these two streams based on event time. Consider the following scenario:

  1. We are going to enrich Click Stream event (containing user_info) with Subscription details
  2. Subscription Status for Alice user is FREE
  3. Current Internal State contains Alice with Subscription status as FREE
  4. Reference data is not flowing because of some issue for 2hrs

  5. Alice upgraded the subscription to Premium at 10.30 AM

  6. Watched video event comes for Alice at 10.40 AM 

    • flink pipeline looks up in internal state and writes to enrichment topic

    • Enrichment topic now contains Alice -> FREE

  7. Reference data starts flowing in at 11AM

    • let's assume we consider late elements upto 2 hours, so the click stream event of Alice is still buffered in the state

    • The enrichment topic will now contain duplicate records for Alice because of multiple firings of window
    1. Alice -> FREE -> 10 AM 
    2. Alice -> PREMIUM -> 11 AM
Question is how do I avoid sending duplicate records ? I am not able to understand it. I can think of Low Level joins but not sure how do we know if it is stale data or not based on timestamp (watermark) as it can happen that a particular enriched record is not updated for 6 hrs.

Regards,
Vinay Patil
Reply | Threaded
Open this post in threaded view
|

Re: Handling stale data enrichment

Konstantin Knauf-3
Hi Vinay,

I assume your subscription updates also have a timestamp and a watermark. Otherwise, there is no way for Flink to tell that the subscription updates are late.

If you use a "temporal table "-style join to join the two streams, and you do not receive any subscription updates for 2 hours, the watermark will not advance (it is the minimum of the two input streams) and hence all click events will be buffered. No output. This has the advantage of not sending out duplicate records, but the disadvantage that you do not make any progress until you see fresh subscription data. Is this the desired behavior for your use case?

Best,

Konstantin


On Thu, Apr 23, 2020 at 1:29 PM Vinay Patil <[hidden email]> wrote:
Hi,

I went through Konstantin webinar on 99 ways you can do enrichment. One thing I am failing to understand is how do we efficiently handle stale data enrichment.

Context: Let's say I want to enrich user data with the subscription data. Here subscription data is acting as reference data and will be used for joining these two streams based on event time. Consider the following scenario:

  1. We are going to enrich Click Stream event (containing user_info) with Subscription details
  2. Subscription Status for Alice user is FREE
  3. Current Internal State contains Alice with Subscription status as FREE
  4. Reference data is not flowing because of some issue for 2hrs

  5. Alice upgraded the subscription to Premium at 10.30 AM

  6. Watched video event comes for Alice at 10.40 AM 

    • flink pipeline looks up in internal state and writes to enrichment topic

    • Enrichment topic now contains Alice -> FREE

  7. Reference data starts flowing in at 11AM

    • let's assume we consider late elements upto 2 hours, so the click stream event of Alice is still buffered in the state

    • The enrichment topic will now contain duplicate records for Alice because of multiple firings of window
    1. Alice -> FREE -> 10 AM 
    2. Alice -> PREMIUM -> 11 AM
Question is how do I avoid sending duplicate records ? I am not able to understand it. I can think of Low Level joins but not sure how do we know if it is stale data or not based on timestamp (watermark) as it can happen that a particular enriched record is not updated for 6 hrs.

Regards,
Vinay Patil


--

Konstantin Knauf

Reply | Threaded
Open this post in threaded view
|

Re: Handling stale data enrichment

Vinay Patil
Hi Konstantin,

Thank you for your answer.

Yes, we have timestamps in the subscription stream

>the disadvantage that you do not make any progress until you see fresh subscription data. Is this the desired behavior for your use case?
No, this is not acceptable. Reason being the subscription data might be a slow changing. Let's say it is not getting updated for 6 hrs. In this case the click stream event is continuously flowing, we want to enrich it against the slow moving stream.

In case of event time join/low level joins, I am assuming that the watermarks will still make progress because of https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources. Or do we still have to handle it in the assigner and emit a watermark if we are not receiving elements for a while ? (not sure how this will work in case of low level joins)

I am considering to use low level joins approach using connected streams where-in I will keep the reference data in state (processElement1) and click stream event (processElement2) and join this. In this case I will buffer the elements of click stream events for a configurable period of time and then delete it. (This is to handle late record). 

I think the downstream consumer of enriched data will have to dedup the duplicate records or else we will end up having stale enrichment.

Regards,
Vinay Patil


On Fri, Apr 24, 2020 at 12:14 PM Konstantin Knauf <[hidden email]> wrote:
Hi Vinay,

I assume your subscription updates also have a timestamp and a watermark. Otherwise, there is no way for Flink to tell that the subscription updates are late.

If you use a "temporal table "-style join to join the two streams, and you do not receive any subscription updates for 2 hours, the watermark will not advance (it is the minimum of the two input streams) and hence all click events will be buffered. No output. This has the advantage of not sending out duplicate records, but the disadvantage that you do not make any progress until you see fresh subscription data. Is this the desired behavior for your use case?

Best,

Konstantin


On Thu, Apr 23, 2020 at 1:29 PM Vinay Patil <[hidden email]> wrote:
Hi,

I went through Konstantin webinar on 99 ways you can do enrichment. One thing I am failing to understand is how do we efficiently handle stale data enrichment.

Context: Let's say I want to enrich user data with the subscription data. Here subscription data is acting as reference data and will be used for joining these two streams based on event time. Consider the following scenario:

  1. We are going to enrich Click Stream event (containing user_info) with Subscription details
  2. Subscription Status for Alice user is FREE
  3. Current Internal State contains Alice with Subscription status as FREE
  4. Reference data is not flowing because of some issue for 2hrs

  5. Alice upgraded the subscription to Premium at 10.30 AM

  6. Watched video event comes for Alice at 10.40 AM 

    • flink pipeline looks up in internal state and writes to enrichment topic

    • Enrichment topic now contains Alice -> FREE

  7. Reference data starts flowing in at 11AM

    • let's assume we consider late elements upto 2 hours, so the click stream event of Alice is still buffered in the state

    • The enrichment topic will now contain duplicate records for Alice because of multiple firings of window
    1. Alice -> FREE -> 10 AM 
    2. Alice -> PREMIUM -> 11 AM
Question is how do I avoid sending duplicate records ? I am not able to understand it. I can think of Low Level joins but not sure how do we know if it is stale data or not based on timestamp (watermark) as it can happen that a particular enriched record is not updated for 6 hrs.

Regards,
Vinay Patil


--

Konstantin Knauf