Windows, watermarks, and late data

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Windows, watermarks, and late data

Michael Radford
I'm evaluating Flink for a reporting application that will keep
various aggregates updated in a database. It will be consuming from
Kafka queues that are replicated from remote data centers, so in case
there is a long outage in replication, I need to decide what to do
about windowing and late data.

If I use Flink's built-in windows and watermarks, any late data will
be come in 1-element windows, which could overwhelm the database if a
large batch of late data comes in and they are each mapped to
individual database updates.

As far as I can tell, I have two options:

1. Ignore late data, by marking it as late in an
AssignerWithPunctuatedWatermarks function, and then discarding it in a
flatMap operator. In this scenario, I would rely on a batch process to
fill in the missing data later, in the lambda architecture style.

2. Implement my own watermark logic to allow full windows of late
data. It seems like I could, for example, emit a "tick" message that
is replicated to all partitions every n messages, and then a custom
Trigger could decide when to purge each window based on the ticks and
a timeout duration. The system would never emit a real Watermark.

My questions are:
- Am I mistaken about either of these, or are there any other options
I'm not seeing for avoiding 1-element windows?
- For option 2, are there any problems with not emitting actual
watermarks, as long as the windows are eventually purged by a trigger?

Thanks,
Mike
Reply | Threaded
Open this post in threaded view
|

Re: Windows, watermarks, and late data

Aljoscha Krettek
Hi,
I did some initial work on extending the EventTimeTrigger a bit to allow more complex behavior. Specifically, this allows setting an “allowed lateness” after which elements should no longer lead to windows being emitted. Also, it allows to specify to keep an emitted window in memory and when a late element arrives emit the whole window again.

The code I have is here: https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

Kostas Kloudas worked on extending it, so maybe he could share his version of the trigger as well.

Cheers,
Aljoscha

> On 01 Mar 2016, at 18:35, Michael Radford <[hidden email]> wrote:
>
> I'm evaluating Flink for a reporting application that will keep
> various aggregates updated in a database. It will be consuming from
> Kafka queues that are replicated from remote data centers, so in case
> there is a long outage in replication, I need to decide what to do
> about windowing and late data.
>
> If I use Flink's built-in windows and watermarks, any late data will
> be come in 1-element windows, which could overwhelm the database if a
> large batch of late data comes in and they are each mapped to
> individual database updates.
>
> As far as I can tell, I have two options:
>
> 1. Ignore late data, by marking it as late in an
> AssignerWithPunctuatedWatermarks function, and then discarding it in a
> flatMap operator. In this scenario, I would rely on a batch process to
> fill in the missing data later, in the lambda architecture style.
>
> 2. Implement my own watermark logic to allow full windows of late
> data. It seems like I could, for example, emit a "tick" message that
> is replicated to all partitions every n messages, and then a custom
> Trigger could decide when to purge each window based on the ticks and
> a timeout duration. The system would never emit a real Watermark.
>
> My questions are:
> - Am I mistaken about either of these, or are there any other options
> I'm not seeing for avoiding 1-element windows?
> - For option 2, are there any problems with not emitting actual
> watermarks, as long as the windows are eventually purged by a trigger?
>
> Thanks,
> Mike

Reply | Threaded
Open this post in threaded view
|

Re: Windows, watermarks, and late data

Kostas Kloudas
Hello Mike,

The code that Aljiosha mentioned is here:


This allows you to specify a trigger like:

EventTimeTriggerWithEarlyAndLateFiring trigger =
EventTimeTriggerWithEarlyAndLateFiring.create()
.withEarlyFiringEvery(Time.minutes(10))
.withLateFiringEvery(Time.minutes(5))
.withAllowedLateness(Time.minutes(20))
.accumulating();

The means that it will fire every 10 minutes (in processing time) until the end of the window (event time), and then
every 5 minutes (processing time) for late elements up to 20 minutes late. In addition, previous elements are not discarded.

Hope this helps,
Kostas

On Mar 2, 2016, at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:

Hi,
I did some initial work on extending the EventTimeTrigger a bit to allow more complex behavior. Specifically, this allows setting an “allowed lateness” after which elements should no longer lead to windows being emitted. Also, it allows to specify to keep an emitted window in memory and when a late element arrives emit the whole window again.

The code I have is here: https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

Kostas Kloudas worked on extending it, so maybe he could share his version of the trigger as well.

Cheers,
Aljoscha
On 01 Mar 2016, at 18:35, Michael Radford <[hidden email]> wrote:

I'm evaluating Flink for a reporting application that will keep
various aggregates updated in a database. It will be consuming from
Kafka queues that are replicated from remote data centers, so in case
there is a long outage in replication, I need to decide what to do
about windowing and late data.

If I use Flink's built-in windows and watermarks, any late data will
be come in 1-element windows, which could overwhelm the database if a
large batch of late data comes in and they are each mapped to
individual database updates.

As far as I can tell, I have two options:

1. Ignore late data, by marking it as late in an
AssignerWithPunctuatedWatermarks function, and then discarding it in a
flatMap operator. In this scenario, I would rely on a batch process to
fill in the missing data later, in the lambda architecture style.

2. Implement my own watermark logic to allow full windows of late
data. It seems like I could, for example, emit a "tick" message that
is replicated to all partitions every n messages, and then a custom
Trigger could decide when to purge each window based on the ticks and
a timeout duration. The system would never emit a real Watermark.

My questions are:
- Am I mistaken about either of these, or are there any other options
I'm not seeing for avoiding 1-element windows?
- For option 2, are there any problems with not emitting actual
watermarks, as long as the windows are eventually purged by a trigger?

Thanks,
Mike


Reply | Threaded
Open this post in threaded view
|

Re: Windows, watermarks, and late data

Michael Radford
Thank you, that was helpful. I didn't appreciate that a Trigger is
fully in control of when to fire / purge regardless of the watermark.

Now I am wondering the best way to distinguish different instances of
the same time window with completely different data, vs. repeated
fires that include data used in previous fires. More specifically:

- If the data is not late, I will hold onto an aggregate maintained by
a FoldFunction, and fire the window periodically without purging. Each
of these fires should carry some key that is the same for every fire,
so that a database update downstream can overwrite the previous
partial aggregates.

- If the data is late, I will only hold onto the aggregate for some
period, and then fire and purge. But this late fire should carry a
different key, so that  the aggregates from the non-late data are not
overwritten. That way, I am able to deal with arbitrarily late data
statelessly without knowing what aggregates for the time window have
already been written.

It looks like I could do this by using RichWindowFunction in the Java
API, and saving the key using the RuntimeContext state API. However, I
can't seem to pass a RichWindowFunction to the Scala WindowedStream's
apply method. Is there any easy way around this?

I was also hoping that the FoldFunction passed to WindowedStream.apply
could be a RichFoldFunction, but that is specifically prohibited for
some reason.

Any hints on how to make a stateful WindowedStream.apply in Scala
would be much appreciated.

Thanks,
Mike

On Wed, Mar 2, 2016 at 2:11 AM, Kostas Kloudas
<[hidden email]> wrote:

> Hello Mike,
>
> The code that Aljiosha mentioned is here:
>
> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>
> This allows you to specify a trigger like:
>
> EventTimeTriggerWithEarlyAndLateFiring trigger =
> EventTimeTriggerWithEarlyAndLateFiring.create()
> .withEarlyFiringEvery(Time.minutes(10))
> .withLateFiringEvery(Time.minutes(5))
> .withAllowedLateness(Time.minutes(20))
> .accumulating();
>
> The means that it will fire every 10 minutes (in processing time) until the
> end of the window (event time), and then
> every 5 minutes (processing time) for late elements up to 20 minutes late.
> In addition, previous elements are not discarded.
>
> Hope this helps,
> Kostas
>
> On Mar 2, 2016, at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi,
> I did some initial work on extending the EventTimeTrigger a bit to allow
> more complex behavior. Specifically, this allows setting an “allowed
> lateness” after which elements should no longer lead to windows being
> emitted. Also, it allows to specify to keep an emitted window in memory and
> when a late element arrives emit the whole window again.
>
> The code I have is here:
> https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
>
> Kostas Kloudas worked on extending it, so maybe he could share his version
> of the trigger as well.
>
> Cheers,
> Aljoscha
>
> On 01 Mar 2016, at 18:35, Michael Radford <[hidden email]> wrote:
>
> I'm evaluating Flink for a reporting application that will keep
> various aggregates updated in a database. It will be consuming from
> Kafka queues that are replicated from remote data centers, so in case
> there is a long outage in replication, I need to decide what to do
> about windowing and late data.
>
> If I use Flink's built-in windows and watermarks, any late data will
> be come in 1-element windows, which could overwhelm the database if a
> large batch of late data comes in and they are each mapped to
> individual database updates.
>
> As far as I can tell, I have two options:
>
> 1. Ignore late data, by marking it as late in an
> AssignerWithPunctuatedWatermarks function, and then discarding it in a
> flatMap operator. In this scenario, I would rely on a batch process to
> fill in the missing data later, in the lambda architecture style.
>
> 2. Implement my own watermark logic to allow full windows of late
> data. It seems like I could, for example, emit a "tick" message that
> is replicated to all partitions every n messages, and then a custom
> Trigger could decide when to purge each window based on the ticks and
> a timeout duration. The system would never emit a real Watermark.
>
> My questions are:
> - Am I mistaken about either of these, or are there any other options
> I'm not seeing for avoiding 1-element windows?
> - For option 2, are there any problems with not emitting actual
> watermarks, as long as the windows are eventually purged by a trigger?
>
> Thanks,
> Mike
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Windows, watermarks, and late data

Michael Radford
Ha, never mind, I realized I can just put the unique key into the
aggregate object maintained by the FoldFunction.

I'm still curious why RichWindowFunction (and RichFoldFunction) aren't
supported for Scala WindowedStream.apply.

Mike

On Thu, Mar 3, 2016 at 4:50 PM, Michael Radford <[hidden email]> wrote:

> Thank you, that was helpful. I didn't appreciate that a Trigger is
> fully in control of when to fire / purge regardless of the watermark.
>
> Now I am wondering the best way to distinguish different instances of
> the same time window with completely different data, vs. repeated
> fires that include data used in previous fires. More specifically:
>
> - If the data is not late, I will hold onto an aggregate maintained by
> a FoldFunction, and fire the window periodically without purging. Each
> of these fires should carry some key that is the same for every fire,
> so that a database update downstream can overwrite the previous
> partial aggregates.
>
> - If the data is late, I will only hold onto the aggregate for some
> period, and then fire and purge. But this late fire should carry a
> different key, so that  the aggregates from the non-late data are not
> overwritten. That way, I am able to deal with arbitrarily late data
> statelessly without knowing what aggregates for the time window have
> already been written.
>
> It looks like I could do this by using RichWindowFunction in the Java
> API, and saving the key using the RuntimeContext state API. However, I
> can't seem to pass a RichWindowFunction to the Scala WindowedStream's
> apply method. Is there any easy way around this?
>
> I was also hoping that the FoldFunction passed to WindowedStream.apply
> could be a RichFoldFunction, but that is specifically prohibited for
> some reason.
>
> Any hints on how to make a stateful WindowedStream.apply in Scala
> would be much appreciated.
>
> Thanks,
> Mike
>
> On Wed, Mar 2, 2016 at 2:11 AM, Kostas Kloudas
> <[hidden email]> wrote:
>> Hello Mike,
>>
>> The code that Aljiosha mentioned is here:
>>
>> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>>
>> This allows you to specify a trigger like:
>>
>> EventTimeTriggerWithEarlyAndLateFiring trigger =
>> EventTimeTriggerWithEarlyAndLateFiring.create()
>> .withEarlyFiringEvery(Time.minutes(10))
>> .withLateFiringEvery(Time.minutes(5))
>> .withAllowedLateness(Time.minutes(20))
>> .accumulating();
>>
>> The means that it will fire every 10 minutes (in processing time) until the
>> end of the window (event time), and then
>> every 5 minutes (processing time) for late elements up to 20 minutes late.
>> In addition, previous elements are not discarded.
>>
>> Hope this helps,
>> Kostas
>>
>> On Mar 2, 2016, at 11:02 AM, Aljoscha Krettek <[hidden email]> wrote:
>>
>> Hi,
>> I did some initial work on extending the EventTimeTrigger a bit to allow
>> more complex behavior. Specifically, this allows setting an “allowed
>> lateness” after which elements should no longer lead to windows being
>> emitted. Also, it allows to specify to keep an emitted window in memory and
>> when a late element arrives emit the whole window again.
>>
>> The code I have is here:
>> https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
>>
>> Kostas Kloudas worked on extending it, so maybe he could share his version
>> of the trigger as well.
>>
>> Cheers,
>> Aljoscha
>>
>> On 01 Mar 2016, at 18:35, Michael Radford <[hidden email]> wrote:
>>
>> I'm evaluating Flink for a reporting application that will keep
>> various aggregates updated in a database. It will be consuming from
>> Kafka queues that are replicated from remote data centers, so in case
>> there is a long outage in replication, I need to decide what to do
>> about windowing and late data.
>>
>> If I use Flink's built-in windows and watermarks, any late data will
>> be come in 1-element windows, which could overwhelm the database if a
>> large batch of late data comes in and they are each mapped to
>> individual database updates.
>>
>> As far as I can tell, I have two options:
>>
>> 1. Ignore late data, by marking it as late in an
>> AssignerWithPunctuatedWatermarks function, and then discarding it in a
>> flatMap operator. In this scenario, I would rely on a batch process to
>> fill in the missing data later, in the lambda architecture style.
>>
>> 2. Implement my own watermark logic to allow full windows of late
>> data. It seems like I could, for example, emit a "tick" message that
>> is replicated to all partitions every n messages, and then a custom
>> Trigger could decide when to purge each window based on the ticks and
>> a timeout duration. The system would never emit a real Watermark.
>>
>> My questions are:
>> - Am I mistaken about either of these, or are there any other options
>> I'm not seeing for avoiding 1-element windows?
>> - For option 2, are there any problems with not emitting actual
>> watermarks, as long as the windows are eventually purged by a trigger?
>>
>> Thanks,
>> Mike
>>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Windows, watermarks, and late data

shikhar
In case this helps, this is a Scala helper I am using to filter out late data on a KeyedStream. The last timestamp state is maintained at the key-level.

```
  implicit class StrictlyAscendingByTime[T, K](stream: KeyedStream[T, K]) {

    def filterStrictlyAscendingTime(timestampExtractor: T => Long)(outOfOrderHandler: (T, Long) => Unit): DataStream[T] = {
      stream.filterWithState((currentElement: T, prevElementTimestamp: Option[Long]) => {
        val currentElementTimestamp = timestampExtractor(currentElement)
        prevElementTimestamp match {
          case None =>
            (true, Some(currentElementTimestamp))
          case Some(t) =>
            if (currentElementTimestamp > t) {
              (true, Some(currentElementTimestamp))
            } else {
              outOfOrderHandler(currentElement, t)
              (false, Some(t))
            }
        }
      })
    }

    def ignoreLateArrivals(timestampExtractor: T => Long): DataStream[T] = {
      stream.filterStrictlyAscendingTime(timestampExtractor) {
        (element, timestamp) => {
          // FLINK-2870 should provide a more idiomatic way to ignore late arrivals
        }
      }
    }

  }
```
Reply | Threaded
Open this post in threaded view
|

Re: Windows, watermarks, and late data

Aljoscha Krettek
Hi Mike,
RichWindowFunction not being supported for the Scala API is an oversight on our side. We’re working to fix it.

For ReduceFunction and FoldFunction it’s a bit more tricky, since they are right now not proper operator functions but only used inside the internal window state to incrementally combine the elements.

Cheers,
Aljoscha

> On 04 Mar 2016, at 03:20, shikhar <[hidden email]> wrote:
>
> In case this helps, this is a Scala helper I am using to filter out late data
> on a KeyedStream. The last timestamp state is maintained at the key-level.
>
> ```
>  implicit class StrictlyAscendingByTime[T, K](stream: KeyedStream[T, K]) {
>
>    def filterStrictlyAscendingTime(timestampExtractor: T =>
> Long)(outOfOrderHandler: (T, Long) => Unit): DataStream[T] = {
>      stream.filterWithState((currentElement: T, prevElementTimestamp:
> Option[Long]) => {
>        val currentElementTimestamp = timestampExtractor(currentElement)
>        prevElementTimestamp match {
>          case None =>
>            (true, Some(currentElementTimestamp))
>          case Some(t) =>
>            if (currentElementTimestamp > t) {
>              (true, Some(currentElementTimestamp))
>            } else {
>              outOfOrderHandler(currentElement, t)
>              (false, Some(t))
>            }
>        }
>      })
>    }
>
>    def ignoreLateArrivals(timestampExtractor: T => Long): DataStream[T] = {
>      stream.filterStrictlyAscendingTime(timestampExtractor) {
>        (element, timestamp) => {
>          // FLINK-2870 should provide a more idiomatic way to ignore late
> arrivals
>        }
>      }
>    }
>
>  }
> ```
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-watermarks-and-late-data-tp5239p5291.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Windows, watermarks, and late data

Stephan Ewen
We have a fix open for support of the RichFunctions. Later today, the 1.0-SNAPSHOT and 1.1-SNAPSHOT will probably support that.

Seems we need to get a 1.0.1 out very soon, because this is a quite important fix.

On Fri, Mar 4, 2016 at 12:37 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi Mike,
RichWindowFunction not being supported for the Scala API is an oversight on our side. We’re working to fix it.

For ReduceFunction and FoldFunction it’s a bit more tricky, since they are right now not proper operator functions but only used inside the internal window state to incrementally combine the elements.

Cheers,
Aljoscha
> On 04 Mar 2016, at 03:20, shikhar <[hidden email]> wrote:
>
> In case this helps, this is a Scala helper I am using to filter out late data
> on a KeyedStream. The last timestamp state is maintained at the key-level.
>
> ```
>  implicit class StrictlyAscendingByTime[T, K](stream: KeyedStream[T, K]) {
>
>    def filterStrictlyAscendingTime(timestampExtractor: T =>
> Long)(outOfOrderHandler: (T, Long) => Unit): DataStream[T] = {
>      stream.filterWithState((currentElement: T, prevElementTimestamp:
> Option[Long]) => {
>        val currentElementTimestamp = timestampExtractor(currentElement)
>        prevElementTimestamp match {
>          case None =>
>            (true, Some(currentElementTimestamp))
>          case Some(t) =>
>            if (currentElementTimestamp > t) {
>              (true, Some(currentElementTimestamp))
>            } else {
>              outOfOrderHandler(currentElement, t)
>              (false, Some(t))
>            }
>        }
>      })
>    }
>
>    def ignoreLateArrivals(timestampExtractor: T => Long): DataStream[T] = {
>      stream.filterStrictlyAscendingTime(timestampExtractor) {
>        (element, timestamp) => {
>          // FLINK-2870 should provide a more idiomatic way to ignore late
> arrivals
>        }
>      }
>    }
>
>  }
> ```
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Windows-watermarks-and-late-data-tp5239p5291.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.