Event-time and first watermark

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Event-time and first watermark

Gwenhael Pasquiers

Hi,

 

From my tests it seems that the initial watermark value is Long.MIN_VALUE even though my first data passed through the timestamp extractor before arriving into my ProcessFunction. It looks like the watermark “lags” behind the data by one message.

 

Is there a way to have a watermark more “up to date” ? Or is the only way to compute it myself into my ProcessFunction ?

 

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Event-time and first watermark

Nico Kruber
Hi Gwenhael,
"A Watermark(t) declares that event time has reached time t in that stream,
meaning that there should be no more elements from the stream with a timestamp
t’ <= t (i.e. events with timestamps older or equal to the watermark)." [1]

Therefore, they should be behind the actual event with timestamp t.

What is it that you want to achieve in the end? What do you want to use the
watermark for? They are basically a means to defining when an event time window
ends.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
event_time.html#event-time-and-watermarks

On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:

> Hi,
>
> From my tests it seems that the initial watermark value is Long.MIN_VALUE
> even though my first data passed through the timestamp extractor before
> arriving into my ProcessFunction. It looks like the watermark "lags" behind
> the data by one message.
>
> Is there a way to have a watermark more "up to date" ? Or is the only way to
> compute it myself into my ProcessFunction ?
>
> Thanks.


signature.asc (201 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

RE: Event-time and first watermark

Gwenhael Pasquiers
We're not using a Window but a more basic ProcessFunction to handle sessions. We made this choice because we have to handle (millions of) sessions that can last from 10 seconds to 24 hours so we wanted to handle things manually using the State class.
 
We're using the watermark as an event-time "clock" to:
* compute "lateness" of a message relatively to the watermark (most recent message from the stream)
* fire timer events

We're using event-time instead of processing time because our stream will be late and data arrive by hourly bursts.
 
Maybe we're misusing the watermark ?

B.R.

-----Original Message-----
From: Nico Kruber [mailto:[hidden email]]
Sent: jeudi 3 août 2017 16:30
To: [hidden email]
Cc: Gwenhael Pasquiers <[hidden email]>
Subject: Re: Event-time and first watermark

Hi Gwenhael,
"A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark)." [1]

Therefore, they should be behind the actual event with timestamp t.

What is it that you want to achieve in the end? What do you want to use the watermark for? They are basically a means to defining when an event time window ends.


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
event_time.html#event-time-and-watermarks

On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:

> Hi,
>
> From my tests it seems that the initial watermark value is
> Long.MIN_VALUE even though my first data passed through the timestamp
> extractor before arriving into my ProcessFunction. It looks like the
> watermark "lags" behind the data by one message.
>
> Is there a way to have a watermark more "up to date" ? Or is the only
> way to compute it myself into my ProcessFunction ?
>
> Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: Event-time and first watermark

Aljoscha Krettek
Hi,

How are you defining the watermark, i.e. what kind of watermark extractor are you using?

Best,
Aljoscha

> On 3. Aug 2017, at 17:45, Gwenhael Pasquiers <[hidden email]> wrote:
>
> We're not using a Window but a more basic ProcessFunction to handle sessions. We made this choice because we have to handle (millions of) sessions that can last from 10 seconds to 24 hours so we wanted to handle things manually using the State class.
>
> We're using the watermark as an event-time "clock" to:
> * compute "lateness" of a message relatively to the watermark (most recent message from the stream)
> * fire timer events
>
> We're using event-time instead of processing time because our stream will be late and data arrive by hourly bursts.
>
> Maybe we're misusing the watermark ?
>
> B.R.
>
> -----Original Message-----
> From: Nico Kruber [mailto:[hidden email]]
> Sent: jeudi 3 août 2017 16:30
> To: [hidden email]
> Cc: Gwenhael Pasquiers <[hidden email]>
> Subject: Re: Event-time and first watermark
>
> Hi Gwenhael,
> "A Watermark(t) declares that event time has reached time t in that stream, meaning that there should be no more elements from the stream with a timestamp t’ <= t (i.e. events with timestamps older or equal to the watermark)." [1]
>
> Therefore, they should be behind the actual event with timestamp t.
>
> What is it that you want to achieve in the end? What do you want to use the watermark for? They are basically a means to defining when an event time window ends.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
> event_time.html#event-time-and-watermarks
>
> On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
>> Hi,
>>
>> From my tests it seems that the initial watermark value is
>> Long.MIN_VALUE even though my first data passed through the timestamp
>> extractor before arriving into my ProcessFunction. It looks like the
>> watermark "lags" behind the data by one message.
>>
>> Is there a way to have a watermark more "up to date" ? Or is the only
>> way to compute it myself into my ProcessFunction ?
>>
>> Thanks.
>

Reply | Threaded
Open this post in threaded view
|

RE: Event-time and first watermark

Gwenhael Pasquiers
We're using a AssignerWithPunctuatedWatermarks that extracts a timestamp from the data. It keeps and returns the higher timestamp it has ever seen and returns a new Watermark everytime the value grows.

I know it's bad for performances, but for the moment it's not the issue, i want the most possibly up-to-date watermark.

-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: vendredi 4 août 2017 12:22
To: Gwenhael Pasquiers <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]
Subject: Re: Event-time and first watermark

Hi,

How are you defining the watermark, i.e. what kind of watermark extractor are you using?

Best,
Aljoscha

> On 3. Aug 2017, at 17:45, Gwenhael Pasquiers <[hidden email]> wrote:
>
> We're not using a Window but a more basic ProcessFunction to handle sessions. We made this choice because we have to handle (millions of) sessions that can last from 10 seconds to 24 hours so we wanted to handle things manually using the State class.
>
> We're using the watermark as an event-time "clock" to:
> * compute "lateness" of a message relatively to the watermark (most
> recent message from the stream)
> * fire timer events
>
> We're using event-time instead of processing time because our stream will be late and data arrive by hourly bursts.
>
> Maybe we're misusing the watermark ?
>
> B.R.
>
> -----Original Message-----
> From: Nico Kruber [mailto:[hidden email]]
> Sent: jeudi 3 août 2017 16:30
> To: [hidden email]
> Cc: Gwenhael Pasquiers <[hidden email]>
> Subject: Re: Event-time and first watermark
>
> Hi Gwenhael,
> "A Watermark(t) declares that event time has reached time t in that
> stream, meaning that there should be no more elements from the stream
> with a timestamp t’ <= t (i.e. events with timestamps older or equal
> to the watermark)." [1]
>
> Therefore, they should be behind the actual event with timestamp t.
>
> What is it that you want to achieve in the end? What do you want to use the watermark for? They are basically a means to defining when an event time window ends.
>
>
> Nico
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
> event_time.html#event-time-and-watermarks
>
> On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
>> Hi,
>>
>> From my tests it seems that the initial watermark value is
>> Long.MIN_VALUE even though my first data passed through the timestamp
>> extractor before arriving into my ProcessFunction. It looks like the
>> watermark "lags" behind the data by one message.
>>
>> Is there a way to have a watermark more "up to date" ? Or is the only
>> way to compute it myself into my ProcessFunction ?
>>
>> Thanks.
>

Reply | Threaded
Open this post in threaded view
|

Re: Event-time and first watermark

Aljoscha Krettek
I see. But yes, even in the case the watermark will always be "one behind". The logic in the extraction operator is roughly this:

 1. Extract timestamp T, assign to internal StreamRecord
 2. Send StreamRecord downstream
 3. Extract Watermark W
 4. Send Watermark downstream

(In your case T == W)

The reason is that a watermark T says that there will not be an element with a timestamp <= T in the future. If the watermark were sent before the record then this would violate the watermark contract, i.e. your element with timestamp T would arrive after the watermark W. I think it's not easily possible to have a properly defined watermark for the very first element in a stream, unfortunately.

Best,
Aljoscha

> On 4. Aug 2017, at 16:43, Gwenhael Pasquiers <[hidden email]> wrote:
>
> We're using a AssignerWithPunctuatedWatermarks that extracts a timestamp from the data. It keeps and returns the higher timestamp it has ever seen and returns a new Watermark everytime the value grows.
>
> I know it's bad for performances, but for the moment it's not the issue, i want the most possibly up-to-date watermark.
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: vendredi 4 août 2017 12:22
> To: Gwenhael Pasquiers <[hidden email]>
> Cc: Nico Kruber <[hidden email]>; [hidden email]
> Subject: Re: Event-time and first watermark
>
> Hi,
>
> How are you defining the watermark, i.e. what kind of watermark extractor are you using?
>
> Best,
> Aljoscha
>
>> On 3. Aug 2017, at 17:45, Gwenhael Pasquiers <[hidden email]> wrote:
>>
>> We're not using a Window but a more basic ProcessFunction to handle sessions. We made this choice because we have to handle (millions of) sessions that can last from 10 seconds to 24 hours so we wanted to handle things manually using the State class.
>>
>> We're using the watermark as an event-time "clock" to:
>> * compute "lateness" of a message relatively to the watermark (most
>> recent message from the stream)
>> * fire timer events
>>
>> We're using event-time instead of processing time because our stream will be late and data arrive by hourly bursts.
>>
>> Maybe we're misusing the watermark ?
>>
>> B.R.
>>
>> -----Original Message-----
>> From: Nico Kruber [mailto:[hidden email]]
>> Sent: jeudi 3 août 2017 16:30
>> To: [hidden email]
>> Cc: Gwenhael Pasquiers <[hidden email]>
>> Subject: Re: Event-time and first watermark
>>
>> Hi Gwenhael,
>> "A Watermark(t) declares that event time has reached time t in that
>> stream, meaning that there should be no more elements from the stream
>> with a timestamp t’ <= t (i.e. events with timestamps older or equal
>> to the watermark)." [1]
>>
>> Therefore, they should be behind the actual event with timestamp t.
>>
>> What is it that you want to achieve in the end? What do you want to use the watermark for? They are basically a means to defining when an event time window ends.
>>
>>
>> Nico
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
>> event_time.html#event-time-and-watermarks
>>
>> On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
>>> Hi,
>>>
>>> From my tests it seems that the initial watermark value is
>>> Long.MIN_VALUE even though my first data passed through the timestamp
>>> extractor before arriving into my ProcessFunction. It looks like the
>>> watermark "lags" behind the data by one message.
>>>
>>> Is there a way to have a watermark more "up to date" ? Or is the only
>>> way to compute it myself into my ProcessFunction ?
>>>
>>> Thanks.
>>
>

Reply | Threaded
Open this post in threaded view
|

RE: Event-time and first watermark

Gwenhael Pasquiers
Hi,

I'm late but thanks for your answer, anyway we made a special case for the first watermark (if(watermark == Long.MIN_VALUE)...)

At least we now know that we did not made anything wrong. Maybe that special case of the first watermark is worth mentioning in your documentation ?

-----Original Message-----
From: Aljoscha Krettek [mailto:[hidden email]]
Sent: mardi 8 août 2017 16:50
To: Gwenhael Pasquiers <[hidden email]>
Cc: Nico Kruber <[hidden email]>; [hidden email]
Subject: Re: Event-time and first watermark

I see. But yes, even in the case the watermark will always be "one behind". The logic in the extraction operator is roughly this:

 1. Extract timestamp T, assign to internal StreamRecord  2. Send StreamRecord downstream  3. Extract Watermark W  4. Send Watermark downstream

(In your case T == W)

The reason is that a watermark T says that there will not be an element with a timestamp <= T in the future. If the watermark were sent before the record then this would violate the watermark contract, i.e. your element with timestamp T would arrive after the watermark W. I think it's not easily possible to have a properly defined watermark for the very first element in a stream, unfortunately.

Best,
Aljoscha

> On 4. Aug 2017, at 16:43, Gwenhael Pasquiers <[hidden email]> wrote:
>
> We're using a AssignerWithPunctuatedWatermarks that extracts a timestamp from the data. It keeps and returns the higher timestamp it has ever seen and returns a new Watermark everytime the value grows.
>
> I know it's bad for performances, but for the moment it's not the issue, i want the most possibly up-to-date watermark.
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: vendredi 4 août 2017 12:22
> To: Gwenhael Pasquiers <[hidden email]>
> Cc: Nico Kruber <[hidden email]>; [hidden email]
> Subject: Re: Event-time and first watermark
>
> Hi,
>
> How are you defining the watermark, i.e. what kind of watermark extractor are you using?
>
> Best,
> Aljoscha
>
>> On 3. Aug 2017, at 17:45, Gwenhael Pasquiers <[hidden email]> wrote:
>>
>> We're not using a Window but a more basic ProcessFunction to handle sessions. We made this choice because we have to handle (millions of) sessions that can last from 10 seconds to 24 hours so we wanted to handle things manually using the State class.
>>
>> We're using the watermark as an event-time "clock" to:
>> * compute "lateness" of a message relatively to the watermark (most
>> recent message from the stream)
>> * fire timer events
>>
>> We're using event-time instead of processing time because our stream will be late and data arrive by hourly bursts.
>>
>> Maybe we're misusing the watermark ?
>>
>> B.R.
>>
>> -----Original Message-----
>> From: Nico Kruber [mailto:[hidden email]]
>> Sent: jeudi 3 août 2017 16:30
>> To: [hidden email]
>> Cc: Gwenhael Pasquiers <[hidden email]>
>> Subject: Re: Event-time and first watermark
>>
>> Hi Gwenhael,
>> "A Watermark(t) declares that event time has reached time t in that
>> stream, meaning that there should be no more elements from the stream
>> with a timestamp t’ <= t (i.e. events with timestamps older or equal
>> to the watermark)." [1]
>>
>> Therefore, they should be behind the actual event with timestamp t.
>>
>> What is it that you want to achieve in the end? What do you want to use the watermark for? They are basically a means to defining when an event time window ends.
>>
>>
>> Nico
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
>> event_time.html#event-time-and-watermarks
>>
>> On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
>>> Hi,
>>>
>>> From my tests it seems that the initial watermark value is
>>> Long.MIN_VALUE even though my first data passed through the
>>> timestamp extractor before arriving into my ProcessFunction. It
>>> looks like the watermark "lags" behind the data by one message.
>>>
>>> Is there a way to have a watermark more "up to date" ? Or is the
>>> only way to compute it myself into my ProcessFunction ?
>>>
>>> Thanks.
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: Event-time and first watermark

Aljoscha Krettek
Coming back to this after a while. What is the place were you would have expected such a note? Unfortunately the documentation about watermarks and process function is a bit spread across the documentation.

If you could point me to where you would expect it that would be very helpful.

Best,
Aljoscha

> On 29. Aug 2017, at 10:22, Gwenhael Pasquiers <[hidden email]> wrote:
>
> Hi,
>
> I'm late but thanks for your answer, anyway we made a special case for the first watermark (if(watermark == Long.MIN_VALUE)...)
>
> At least we now know that we did not made anything wrong. Maybe that special case of the first watermark is worth mentioning in your documentation ?
>
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[hidden email]]
> Sent: mardi 8 août 2017 16:50
> To: Gwenhael Pasquiers <[hidden email]>
> Cc: Nico Kruber <[hidden email]>; [hidden email]
> Subject: Re: Event-time and first watermark
>
> I see. But yes, even in the case the watermark will always be "one behind". The logic in the extraction operator is roughly this:
>
> 1. Extract timestamp T, assign to internal StreamRecord  2. Send StreamRecord downstream  3. Extract Watermark W  4. Send Watermark downstream
>
> (In your case T == W)
>
> The reason is that a watermark T says that there will not be an element with a timestamp <= T in the future. If the watermark were sent before the record then this would violate the watermark contract, i.e. your element with timestamp T would arrive after the watermark W. I think it's not easily possible to have a properly defined watermark for the very first element in a stream, unfortunately.
>
> Best,
> Aljoscha
>> On 4. Aug 2017, at 16:43, Gwenhael Pasquiers <[hidden email]> wrote:
>>
>> We're using a AssignerWithPunctuatedWatermarks that extracts a timestamp from the data. It keeps and returns the higher timestamp it has ever seen and returns a new Watermark everytime the value grows.
>>
>> I know it's bad for performances, but for the moment it's not the issue, i want the most possibly up-to-date watermark.
>>
>> -----Original Message-----
>> From: Aljoscha Krettek [mailto:[hidden email]]
>> Sent: vendredi 4 août 2017 12:22
>> To: Gwenhael Pasquiers <[hidden email]>
>> Cc: Nico Kruber <[hidden email]>; [hidden email]
>> Subject: Re: Event-time and first watermark
>>
>> Hi,
>>
>> How are you defining the watermark, i.e. what kind of watermark extractor are you using?
>>
>> Best,
>> Aljoscha
>>
>>> On 3. Aug 2017, at 17:45, Gwenhael Pasquiers <[hidden email]> wrote:
>>>
>>> We're not using a Window but a more basic ProcessFunction to handle sessions. We made this choice because we have to handle (millions of) sessions that can last from 10 seconds to 24 hours so we wanted to handle things manually using the State class.
>>>
>>> We're using the watermark as an event-time "clock" to:
>>> * compute "lateness" of a message relatively to the watermark (most
>>> recent message from the stream)
>>> * fire timer events
>>>
>>> We're using event-time instead of processing time because our stream will be late and data arrive by hourly bursts.
>>>
>>> Maybe we're misusing the watermark ?
>>>
>>> B.R.
>>>
>>> -----Original Message-----
>>> From: Nico Kruber [mailto:[hidden email]]
>>> Sent: jeudi 3 août 2017 16:30
>>> To: [hidden email]
>>> Cc: Gwenhael Pasquiers <[hidden email]>
>>> Subject: Re: Event-time and first watermark
>>>
>>> Hi Gwenhael,
>>> "A Watermark(t) declares that event time has reached time t in that
>>> stream, meaning that there should be no more elements from the stream
>>> with a timestamp t’ <= t (i.e. events with timestamps older or equal
>>> to the watermark)." [1]
>>>
>>> Therefore, they should be behind the actual event with timestamp t.
>>>
>>> What is it that you want to achieve in the end? What do you want to use the watermark for? They are basically a means to defining when an event time window ends.
>>>
>>>
>>> Nico
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/
>>> event_time.html#event-time-and-watermarks
>>>
>>> On Thursday, 3 August 2017 10:24:35 CEST Gwenhael Pasquiers wrote:
>>>> Hi,
>>>>
>>>> From my tests it seems that the initial watermark value is
>>>> Long.MIN_VALUE even though my first data passed through the
>>>> timestamp extractor before arriving into my ProcessFunction. It
>>>> looks like the watermark "lags" behind the data by one message.
>>>>
>>>> Is there a way to have a watermark more "up to date" ? Or is the
>>>> only way to compute it myself into my ProcessFunction ?
>>>>
>>>> Thanks.
>>>
>>
>