"Fill in" notification messages based on event time watermark

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

"Fill in" notification messages based on event time watermark

Manas Kale
Hi,
I have an upstream operator that outputs device state transition messages with event timestamps. Meaning it only emits output when a transition takes place.
For example, 
state1 @ 1 PM
state2 @ 2 PM
and so on. 

Using a downstream operator, I want to emit notification messages as per some configured periodicity. For example, if periodicity = 20 min, in the above scenario this operator will output : 
state1 notification @ 1PM
state1 notification @ 1.20PM
state1 notification @ 1.40PM
 ...

Now the main issue is that I want this to be driven by the watermark and not by transition events received from upstream. Meaning I would like to see notification events as soon as the watermark crosses their timestamps; not when the next transition event arrives at the operator (which could be hours later, as above).

My first solution, using a keyedProcessFunction and timers did not work as expected because the order in which transition events arrived at this operator was non-deterministic. To elaborate, assume a setAutoWatermarkInterval of 10 second.
If we get transition events :
state1 @ 1sec
state2 @ 3 sec
state3 @ 5 sec
state1 @ 8 sec
the order in which these events arrived at my keyedProcessFunction was not fixed. To solve this, these messages need to be sorted on event time, which led me to my second solution.

My second solution, using a EventTimeTumblingWindow with size = setAutoWatermarkInterval, also does not work. I sorted accumulated events in the window and applied notification-generation logic on them in order. However, I assumed that windows are created even if there are no elements. Since this is not the case, this solution generates notifications only when the next state tranisition message arrives, which could be hours later.

Does anyone have any suggestions on how I can implement this?
Thanks!



Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

Piotr Nowojski-3
Hi,

I’m not sure, but I don’t think there is an existing window that would do exactly what you want. I would suggest to go back to the `keyedProcessFunction` (or a custom operator?), and have a MapState<TimeStamp, StateWithTimeStamp> currentStates field. Your key would be for example a timestamp of the beginning of your window. Value would be the latest state in this time window, annotated with a timestamp when this state was record.

On each element:

1. you determine the window’s begin ts (key of the map)
2. If it’s first element, register an event time timer to publish results for that window’s end TS
3. look into the `currentStates` if it should be modified (if your new element is newer or first value for the given key)

On even time timer firing
1. output the state matching to this timer
2. Check if there is a (more recent) value for next window, and if not:
 
3. copy the value to next window
4. Register a timer for this window to fire

5. Cleanup currentState and remove value for the no longed needed key.

I hope this helps

Piotrek 

On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]> wrote:

Hi,
I have an upstream operator that outputs device state transition messages with event timestamps. Meaning it only emits output when a transition takes place.
For example, 
state1 @ 1 PM
state2 @ 2 PM
and so on. 

Using a downstream operator, I want to emit notification messages as per some configured periodicity. For example, if periodicity = 20 min, in the above scenario this operator will output : 
state1 notification @ 1PM
state1 notification @ 1.20PM
state1 notification @ 1.40PM
 ...

Now the main issue is that I want this to be driven by the watermark and not by transition events received from upstream. Meaning I would like to see notification events as soon as the watermark crosses their timestamps; not when the next transition event arrives at the operator (which could be hours later, as above).

My first solution, using a keyedProcessFunction and timers did not work as expected because the order in which transition events arrived at this operator was non-deterministic. To elaborate, assume a setAutoWatermarkInterval of 10 second.
If we get transition events :
state1 @ 1sec
state2 @ 3 sec
state3 @ 5 sec
state1 @ 8 sec
the order in which these events arrived at my keyedProcessFunction was not fixed. To solve this, these messages need to be sorted on event time, which led me to my second solution.

My second solution, using a EventTimeTumblingWindow with size = setAutoWatermarkInterval, also does not work. I sorted accumulated events in the window and applied notification-generation logic on them in order. However, I assumed that windows are created even if there are no elements. Since this is not the case, this solution generates notifications only when the next state tranisition message arrives, which could be hours later.

Does anyone have any suggestions on how I can implement this?
Thanks!




Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

David Anderson-2
Following up on Piotr's outline, there's an example in the documentation of how to use a KeyedProcessFunction to implement an event-time tumbling window [1]. Perhaps that can help you get started.

Regards,
David



On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but I don’t think there is an existing window that would do exactly what you want. I would suggest to go back to the `keyedProcessFunction` (or a custom operator?), and have a MapState<TimeStamp, StateWithTimeStamp> currentStates field. Your key would be for example a timestamp of the beginning of your window. Value would be the latest state in this time window, annotated with a timestamp when this state was record.

On each element:

1. you determine the window’s begin ts (key of the map)
2. If it’s first element, register an event time timer to publish results for that window’s end TS
3. look into the `currentStates` if it should be modified (if your new element is newer or first value for the given key)

On even time timer firing
1. output the state matching to this timer
2. Check if there is a (more recent) value for next window, and if not:
 
3. copy the value to next window
4. Register a timer for this window to fire

5. Cleanup currentState and remove value for the no longed needed key.

I hope this helps

Piotrek 

On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]> wrote:

Hi,
I have an upstream operator that outputs device state transition messages with event timestamps. Meaning it only emits output when a transition takes place.
For example, 
state1 @ 1 PM
state2 @ 2 PM
and so on. 

Using a downstream operator, I want to emit notification messages as per some configured periodicity. For example, if periodicity = 20 min, in the above scenario this operator will output : 
state1 notification @ 1PM
state1 notification @ 1.20PM
state1 notification @ 1.40PM
 ...

Now the main issue is that I want this to be driven by the watermark and not by transition events received from upstream. Meaning I would like to see notification events as soon as the watermark crosses their timestamps; not when the next transition event arrives at the operator (which could be hours later, as above).

My first solution, using a keyedProcessFunction and timers did not work as expected because the order in which transition events arrived at this operator was non-deterministic. To elaborate, assume a setAutoWatermarkInterval of 10 second.
If we get transition events :
state1 @ 1sec
state2 @ 3 sec
state3 @ 5 sec
state1 @ 8 sec
the order in which these events arrived at my keyedProcessFunction was not fixed. To solve this, these messages need to be sorted on event time, which led me to my second solution.

My second solution, using a EventTimeTumblingWindow with size = setAutoWatermarkInterval, also does not work. I sorted accumulated events in the window and applied notification-generation logic on them in order. However, I assumed that windows are created even if there are no elements. Since this is not the case, this solution generates notifications only when the next state tranisition message arrives, which could be hours later.

Does anyone have any suggestions on how I can implement this?
Thanks!




Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

Manas Kale
Hi David and Piotrek,
Thank you both for your inputs. 
I tried an implementation with the algorithm Piotrek suggested and David's example. Although notifications are being generated with the watermark, subsequent transition events are being received after the watermark has crossed their timestamps. For example:
state1 @ 100
notification state1@ 110
notification state1@ 120
notification state1@ 130    <----- shouldn't have emitted this
state2 @ 125                     <----- watermark is > 125 at this stage

I think something might be subtly(?) wrong with how I have structured upstream operators. The allowed lateness is 0 in the watermarkassigner upstream, and I generate watermarks every x seconds. 
The operator that emits state transitions is constructed using the TumblingWindow approach I described in the first e-mail (so that I can compute at every watermark update). Note that I can use this approach for state-transition-operator because it only wants to emit transitions, and nothing in between.
So, two questions:
1. Any idea on what might be causing this incorrect watermark behaviour?
2. If I want to perform some computation only when the watermark updates, is using a watermark-aligned EventTimeTumblingWindow (meaning windowDuration = watermarkUpdateInterval) the correct way to do this? 


Regards,
Manas


On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[hidden email]> wrote:
Following up on Piotr's outline, there's an example in the documentation of how to use a KeyedProcessFunction to implement an event-time tumbling window [1]. Perhaps that can help you get started.

Regards,
David



On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[hidden email]> wrote:
Hi,

I’m not sure, but I don’t think there is an existing window that would do exactly what you want. I would suggest to go back to the `keyedProcessFunction` (or a custom operator?), and have a MapState<TimeStamp, StateWithTimeStamp> currentStates field. Your key would be for example a timestamp of the beginning of your window. Value would be the latest state in this time window, annotated with a timestamp when this state was record.

On each element:

1. you determine the window’s begin ts (key of the map)
2. If it’s first element, register an event time timer to publish results for that window’s end TS
3. look into the `currentStates` if it should be modified (if your new element is newer or first value for the given key)

On even time timer firing
1. output the state matching to this timer
2. Check if there is a (more recent) value for next window, and if not:
 
3. copy the value to next window
4. Register a timer for this window to fire

5. Cleanup currentState and remove value for the no longed needed key.

I hope this helps

Piotrek 

On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]> wrote:

Hi,
I have an upstream operator that outputs device state transition messages with event timestamps. Meaning it only emits output when a transition takes place.
For example, 
state1 @ 1 PM
state2 @ 2 PM
and so on. 

Using a downstream operator, I want to emit notification messages as per some configured periodicity. For example, if periodicity = 20 min, in the above scenario this operator will output : 
state1 notification @ 1PM
state1 notification @ 1.20PM
state1 notification @ 1.40PM
 ...

Now the main issue is that I want this to be driven by the watermark and not by transition events received from upstream. Meaning I would like to see notification events as soon as the watermark crosses their timestamps; not when the next transition event arrives at the operator (which could be hours later, as above).

My first solution, using a keyedProcessFunction and timers did not work as expected because the order in which transition events arrived at this operator was non-deterministic. To elaborate, assume a setAutoWatermarkInterval of 10 second.
If we get transition events :
state1 @ 1sec
state2 @ 3 sec
state3 @ 5 sec
state1 @ 8 sec
the order in which these events arrived at my keyedProcessFunction was not fixed. To solve this, these messages need to be sorted on event time, which led me to my second solution.

My second solution, using a EventTimeTumblingWindow with size = setAutoWatermarkInterval, also does not work. I sorted accumulated events in the window and applied notification-generation logic on them in order. However, I assumed that windows are created even if there are no elements. Since this is not the case, this solution generates notifications only when the next state tranisition message arrives, which could be hours later.

Does anyone have any suggestions on how I can implement this?
Thanks!




Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

Timo Walther
Hi Manas,

Reg. 1: I would recommend to use a debugger in your IDE and check which
watermarks are travelling through your operators.

Reg. 2: All event-time operations are only performed once the watermark
arrived from all parallel instances. So roughly speaking, in machine
time you can assume that the window is computed in watermark update
intervals. However, "what is computed" depends on the timestamps of your
events and how those are categorized in windows.

I hope this helps a bit.

Regards,
Timo

On 28.04.20 14:38, Manas Kale wrote:

> Hi David and Piotrek,
> Thank you both for your inputs.
> I tried an implementation with the algorithm Piotrek suggested and
> David's example. Although notifications are being generated with the
> watermark, subsequent transition events are being received after the
> watermark has crossed their timestamps. For example:
> state1 @ 100
> notification state1@ 110
> notification state1@ 120
> notification state1@ 130    <----- shouldn't have emitted this
> state2 @ 125                     <----- watermark is > 125 at this stage
>
> I think something might be subtly(?) wrong with how I have structured
> upstream operators. The allowed lateness is 0 in the watermarkassigner
> upstream, and I generate watermarks every x seconds.
> The operator that emits state transitions is constructed using the
> TumblingWindow approach I described in the first e-mail (so that I can
> compute at every watermark update). Note that I can use this approach
> for state-transition-operator because it only wants to emit transitions,
> and nothing in between.
> So, two questions:
> 1. Any idea on what might be causing this incorrect watermark behaviour?
> 2. If I want to perform some computation only when the watermark
> updates, is using a watermark-aligned EventTimeTumblingWindow (meaning
> windowDuration = watermarkUpdateInterval) the correct way to do this?
>
>
> Regards,
> Manas
>
>
> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Following up on Piotr's outline, there's an example in the
>     documentation of how to use a KeyedProcessFunction to implement an
>     event-time tumbling window [1]. Perhaps that can help you get started.
>
>     Regards,
>     David
>
>     [1]
>     https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
>
>
>     On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[hidden email]
>     <mailto:[hidden email]>> wrote:
>
>         Hi,
>
>         I’m not sure, but I don’t think there is an existing window that
>         would do exactly what you want. I would suggest to go back to
>         the `keyedProcessFunction` (or a custom operator?), and have a
>         MapState<TimeStamp, StateWithTimeStamp> currentStates field.
>         Your key would be for example a timestamp of the beginning of
>         your window. Value would be the latest state in this time
>         window, annotated with a timestamp when this state was record.
>
>         On each element:
>
>         1. you determine the window’s begin ts (key of the map)
>         2. If it’s first element, register an event time timer to
>         publish results for that window’s end TS
>         3. look into the `currentStates` if it should be modified (if
>         your new element is newer or first value for the given key)
>
>         On even time timer firing
>         1. output the state matching to this timer
>         2. Check if there is a (more recent) value for next window, and
>         if not:
>         3. copy the value to next window
>         4. Register a timer for this window to fire
>
>         5. Cleanup currentState and remove value for the no longed
>         needed key.
>
>         I hope this helps
>
>         Piotrek
>
>>         On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]
>>         <mailto:[hidden email]>> wrote:
>>
>>         Hi,
>>         I have an upstream operator that outputs device state
>>         transition messages with event timestamps. Meaning it only
>>         emits output when a transition takes place.
>>         For example,
>>         state1 @ 1 PM
>>         state2 @ 2 PM
>>         and so on.
>>
>>         *Using a downstream operator, I want to emit notification
>>         messages as per some configured periodicity.* For example, if
>>         periodicity = 20 min, in the above scenario this operator will
>>         output :
>>         state1 notification @ 1PM
>>         state1 notification @ 1.20PM
>>         state1 notification @ 1.40PM
>>          ...
>>
>>         *Now the main issue is that I want this to be driven by the
>>         /watermark /and not by transition events received from
>>         upstream. *Meaning I would like to see notification events as
>>         soon as the watermark crosses their timestamps; /not/ when the
>>         next transition event arrives at the operator (which could be
>>         hours later, as above).
>>
>>         My first solution, using a keyedProcessFunction and timers did
>>         not work as expected because the order in which transition
>>         events arrived at this operator was non-deterministic. To
>>         elaborate, assume a setAutoWatermarkInterval of 10 second.
>>         If we get transition events :
>>         state1 @ 1sec
>>         state2 @ 3 sec
>>         state3 @ 5 sec
>>         state1 @ 8 sec
>>         the order in which these events arrived at my
>>         keyedProcessFunction was not fixed. To solve this, these
>>         messages need to be sorted on event time, which led me to my
>>         second solution.
>>
>>         My second solution, using a EventTimeTumblingWindow with size
>>         = setAutoWatermarkInterval, also does not work. I sorted
>>         accumulated events in the window and applied
>>         notification-generation logic on them in order. However, I
>>         assumed that windows are created even if there are no
>>         elements. Since this is not the case, this solution generates
>>         notifications only when the next state tranisition message
>>         arrives, which could be hours later.
>>
>>         Does anyone have any suggestions on how I can implement this?
>>         Thanks!
>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

Piotr Nowojski-3
Hi Manas,

Adding to the response from Timo, if you don’t have unit tests/integration tests, I would strongly recommend setting them up, as it makes debugging and testing easier. You can read how to do it for your functions and operators here [1] and here [2].

Piotrek


On 28 Apr 2020, at 18:45, Timo Walther <[hidden email]> wrote:

Hi Manas,

Reg. 1: I would recommend to use a debugger in your IDE and check which watermarks are travelling through your operators.

Reg. 2: All event-time operations are only performed once the watermark arrived from all parallel instances. So roughly speaking, in machine time you can assume that the window is computed in watermark update intervals. However, "what is computed" depends on the timestamps of your events and how those are categorized in windows.

I hope this helps a bit.

Regards,
Timo

On 28.04.20 14:38, Manas Kale wrote:
Hi David and Piotrek,
Thank you both for your inputs.
I tried an implementation with the algorithm Piotrek suggested and David's example. Although notifications are being generated with the watermark, subsequent transition events are being received after the watermark has crossed their timestamps. For example:
state1 @ 100
notification state1@ 110
notification state1@ 120
notification state1@ 130    <----- shouldn't have emitted this
state2 @ 125                     <----- watermark is > 125 at this stage
I think something might be subtly(?) wrong with how I have structured upstream operators. The allowed lateness is 0 in the watermarkassigner upstream, and I generate watermarks every x seconds.
The operator that emits state transitions is constructed using the TumblingWindow approach I described in the first e-mail (so that I can compute at every watermark update). Note that I can use this approach for state-transition-operator because it only wants to emit transitions, and nothing in between.
So, two questions:
1. Any idea on what might be causing this incorrect watermark behaviour?
2. If I want to perform some computation only when the watermark updates, is using a watermark-aligned EventTimeTumblingWindow (meaning windowDuration = watermarkUpdateInterval) the correct way to do this?
Regards,
Manas
On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[hidden email] <[hidden email]>> wrote:
   Following up on Piotr's outline, there's an example in the
   documentation of how to use a KeyedProcessFunction to implement an
   event-time tumbling window [1]. Perhaps that can help you get started.
   Regards,
   David
   [1]
   https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
   On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[hidden email]
   <[hidden email]>> wrote:
       Hi,
       I’m not sure, but I don’t think there is an existing window that
       would do exactly what you want. I would suggest to go back to
       the `keyedProcessFunction` (or a custom operator?), and have a
       MapState<TimeStamp, StateWithTimeStamp> currentStates field.
       Your key would be for example a timestamp of the beginning of
       your window. Value would be the latest state in this time
       window, annotated with a timestamp when this state was record.
       On each element:
       1. you determine the window’s begin ts (key of the map)
       2. If it’s first element, register an event time timer to
       publish results for that window’s end TS
       3. look into the `currentStates` if it should be modified (if
       your new element is newer or first value for the given key)
       On even time timer firing
       1. output the state matching to this timer
       2. Check if there is a (more recent) value for next window, and
       if not:
       3. copy the value to next window
       4. Register a timer for this window to fire
       5. Cleanup currentState and remove value for the no longed
       needed key.
       I hope this helps
       Piotrek
       On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]
       <[hidden email]>> wrote:

       Hi,
       I have an upstream operator that outputs device state
       transition messages with event timestamps. Meaning it only
       emits output when a transition takes place.
       For example,
       state1 @ 1 PM
       state2 @ 2 PM
       and so on.

       *Using a downstream operator, I want to emit notification
       messages as per some configured periodicity.* For example, if
       periodicity = 20 min, in the above scenario this operator will
       output :
       state1 notification @ 1PM
       state1 notification @ 1.20PM
       state1 notification @ 1.40PM
        ...

       *Now the main issue is that I want this to be driven by the
       /watermark /and not by transition events received from
       upstream. *Meaning I would like to see notification events as
       soon as the watermark crosses their timestamps; /not/ when the
       next transition event arrives at the operator (which could be
       hours later, as above).

       My first solution, using a keyedProcessFunction and timers did
       not work as expected because the order in which transition
       events arrived at this operator was non-deterministic. To
       elaborate, assume a setAutoWatermarkInterval of 10 second.
       If we get transition events :
       state1 @ 1sec
       state2 @ 3 sec
       state3 @ 5 sec
       state1 @ 8 sec
       the order in which these events arrived at my
       keyedProcessFunction was not fixed. To solve this, these
       messages need to be sorted on event time, which led me to my
       second solution.

       My second solution, using a EventTimeTumblingWindow with size
       = setAutoWatermarkInterval, also does not work. I sorted
       accumulated events in the window and applied
       notification-generation logic on them in order. However, I
       assumed that windows are created even if there are no
       elements. Since this is not the case, this solution generates
       notifications only when the next state tranisition message
       arrives, which could be hours later.

       Does anyone have any suggestions on how I can implement this?
       Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

Manas Kale
Hi Timo and Piotrek,
Thank you for the suggestions. 
I have been trying to set up unit tests at the operator granularity, and the blog post's testHarness examples certainly help a lot in this regard.

I understood my problem - an upstream session window operator can only report the end of the session window when the watermark has passed {lastObserverEvent + sessionTimeout}. However, my watermark was being updated periodically without taking this into account. It seems I will have to delay this notification operator's watermark by sessionTimeout.  
Another complication is that this sessionTimeout is per-key, so I guess I will have to implement a watermark assigner that extracts the delay period from data (similar to DynamicEventTimeWindows). 

Also, if I do implement such an assigner, would it be helpful to add it to Flink? I am happy to contribute if so. Any other comments/observations are also welcome!

Thank you all for the help,
Manas


On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <[hidden email]> wrote:
Hi Manas,

Adding to the response from Timo, if you don’t have unit tests/integration tests, I would strongly recommend setting them up, as it makes debugging and testing easier. You can read how to do it for your functions and operators here [1] and here [2].

Piotrek


On 28 Apr 2020, at 18:45, Timo Walther <[hidden email]> wrote:

Hi Manas,

Reg. 1: I would recommend to use a debugger in your IDE and check which watermarks are travelling through your operators.

Reg. 2: All event-time operations are only performed once the watermark arrived from all parallel instances. So roughly speaking, in machine time you can assume that the window is computed in watermark update intervals. However, "what is computed" depends on the timestamps of your events and how those are categorized in windows.

I hope this helps a bit.

Regards,
Timo

On 28.04.20 14:38, Manas Kale wrote:
Hi David and Piotrek,
Thank you both for your inputs.
I tried an implementation with the algorithm Piotrek suggested and David's example. Although notifications are being generated with the watermark, subsequent transition events are being received after the watermark has crossed their timestamps. For example:
state1 @ 100
notification state1@ 110
notification state1@ 120
notification state1@ 130    <----- shouldn't have emitted this
state2 @ 125                     <----- watermark is > 125 at this stage
I think something might be subtly(?) wrong with how I have structured upstream operators. The allowed lateness is 0 in the watermarkassigner upstream, and I generate watermarks every x seconds.
The operator that emits state transitions is constructed using the TumblingWindow approach I described in the first e-mail (so that I can compute at every watermark update). Note that I can use this approach for state-transition-operator because it only wants to emit transitions, and nothing in between.
So, two questions:
1. Any idea on what might be causing this incorrect watermark behaviour?
2. If I want to perform some computation only when the watermark updates, is using a watermark-aligned EventTimeTumblingWindow (meaning windowDuration = watermarkUpdateInterval) the correct way to do this?
Regards,
Manas
On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[hidden email] <[hidden email]>> wrote:
   Following up on Piotr's outline, there's an example in the
   documentation of how to use a KeyedProcessFunction to implement an
   event-time tumbling window [1]. Perhaps that can help you get started.
   Regards,
   David
   [1]
   https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
   On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[hidden email]
   <[hidden email]>> wrote:
       Hi,
       I’m not sure, but I don’t think there is an existing window that
       would do exactly what you want. I would suggest to go back to
       the `keyedProcessFunction` (or a custom operator?), and have a
       MapState<TimeStamp, StateWithTimeStamp> currentStates field.
       Your key would be for example a timestamp of the beginning of
       your window. Value would be the latest state in this time
       window, annotated with a timestamp when this state was record.
       On each element:
       1. you determine the window’s begin ts (key of the map)
       2. If it’s first element, register an event time timer to
       publish results for that window’s end TS
       3. look into the `currentStates` if it should be modified (if
       your new element is newer or first value for the given key)
       On even time timer firing
       1. output the state matching to this timer
       2. Check if there is a (more recent) value for next window, and
       if not:
       3. copy the value to next window
       4. Register a timer for this window to fire
       5. Cleanup currentState and remove value for the no longed
       needed key.
       I hope this helps
       Piotrek
       On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]
       <[hidden email]>> wrote:

       Hi,
       I have an upstream operator that outputs device state
       transition messages with event timestamps. Meaning it only
       emits output when a transition takes place.
       For example,
       state1 @ 1 PM
       state2 @ 2 PM
       and so on.

       *Using a downstream operator, I want to emit notification
       messages as per some configured periodicity.* For example, if
       periodicity = 20 min, in the above scenario this operator will
       output :
       state1 notification @ 1PM
       state1 notification @ 1.20PM
       state1 notification @ 1.40PM
        ...

       *Now the main issue is that I want this to be driven by the
       /watermark /and not by transition events received from
       upstream. *Meaning I would like to see notification events as
       soon as the watermark crosses their timestamps; /not/ when the
       next transition event arrives at the operator (which could be
       hours later, as above).

       My first solution, using a keyedProcessFunction and timers did
       not work as expected because the order in which transition
       events arrived at this operator was non-deterministic. To
       elaborate, assume a setAutoWatermarkInterval of 10 second.
       If we get transition events :
       state1 @ 1sec
       state2 @ 3 sec
       state3 @ 5 sec
       state1 @ 8 sec
       the order in which these events arrived at my
       keyedProcessFunction was not fixed. To solve this, these
       messages need to be sorted on event time, which led me to my
       second solution.

       My second solution, using a EventTimeTumblingWindow with size
       = setAutoWatermarkInterval, also does not work. I sorted
       accumulated events in the window and applied
       notification-generation logic on them in order. However, I
       assumed that windows are created even if there are no
       elements. Since this is not the case, this solution generates
       notifications only when the next state tranisition message
       arrives, which could be hours later.

       Does anyone have any suggestions on how I can implement this?
       Thanks!

Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

Aljoscha Krettek
I think there is some confusion in this thread between the auto
watermark interval and the interval (length) of an event-time window.
Maybe clearing that up for everyone helps.

The auto watermark interval is the periodicity (in processing time) at
which Flink asks the source (or a watermark generator) what the current
watermark is. The source will keep track of the timestamps that it can
"respond" to Flink when it asks. For example, if the auto watermark
interval is set to 1 sec, Flink will update the watermark information
every second. This doesn't mean, though, that the watermark advances 1
sec in that time. If you're reading through some historic data the
watermark could jump by hours in between those 1 second intervals. You
can also think of this as the sampling interval for updating the current
watermark.

The window size size independent of the auto watermark interval, you can
have an arbitrary size here. The auto watermark interval only controls
how frequent Flink will check and emit the contents of windows, if their
end timestamp is below the watermark.

I hope that helps. If we're all clear we can look at the concrete
problem again.

Best,
Aljoscha

On 30.04.20 12:46, Manas Kale wrote:

> Hi Timo and Piotrek,
> Thank you for the suggestions.
> I have been trying to set up unit tests at the operator granularity, and
> the blog post's testHarness examples certainly help a lot in this regard.
>
> I understood my problem - an upstream session window operator can only
> report the end of the session window when the watermark has passed
> {lastObserverEvent + sessionTimeout}. However, my watermark was being
> updated periodically without taking this into account. It seems I will have
> to delay this notification operator's watermark by sessionTimeout.
> Another complication is that this sessionTimeout is per-key, so I guess I
> will have to implement a watermark assigner that extracts the delay period
> from data (similar to DynamicEventTimeWindows).
>
> Also, if I do implement such an assigner, would it be helpful to add it to
> Flink? I am happy to contribute if so. Any other comments/observations are
> also welcome!
>
> Thank you all for the help,
> Manas
>
>
> On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <[hidden email]> wrote:
>
>> Hi Manas,
>>
>> Adding to the response from Timo, if you don’t have unit tests/integration
>> tests, I would strongly recommend setting them up, as it makes debugging
>> and testing easier. You can read how to do it for your functions and
>> operators here [1] and here [2].
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>> [2]
>> https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
>>
>> On 28 Apr 2020, at 18:45, Timo Walther <[hidden email]> wrote:
>>
>> Hi Manas,
>>
>> Reg. 1: I would recommend to use a debugger in your IDE and check which
>> watermarks are travelling through your operators.
>>
>> Reg. 2: All event-time operations are only performed once the watermark
>> arrived from all parallel instances. So roughly speaking, in machine time
>> you can assume that the window is computed in watermark update intervals.
>> However, "what is computed" depends on the timestamps of your events and
>> how those are categorized in windows.
>>
>> I hope this helps a bit.
>>
>> Regards,
>> Timo
>>
>> On 28.04.20 14:38, Manas Kale wrote:
>>
>> Hi David and Piotrek,
>> Thank you both for your inputs.
>> I tried an implementation with the algorithm Piotrek suggested and David's
>> example. Although notifications are being generated with the watermark,
>> subsequent transition events are being received after the watermark has
>> crossed their timestamps. For example:
>> state1 @ 100
>> notification state1@ 110
>> notification state1@ 120
>> notification state1@ 130    <----- shouldn't have emitted this
>> state2 @ 125                     <----- watermark is > 125 at this stage
>> I think something might be subtly(?) wrong with how I have structured
>> upstream operators. The allowed lateness is 0 in the watermarkassigner
>> upstream, and I generate watermarks every x seconds.
>> The operator that emits state transitions is constructed using the
>> TumblingWindow approach I described in the first e-mail (so that I can
>> compute at every watermark update). Note that I can use this approach for
>> state-transition-operator because it only wants to emit transitions, and
>> nothing in between.
>> So, two questions:
>> 1. Any idea on what might be causing this incorrect watermark behaviour?
>> 2. If I want to perform some computation only when the watermark updates,
>> is using a watermark-aligned EventTimeTumblingWindow (meaning
>> windowDuration = watermarkUpdateInterval) the correct way to do this?
>> Regards,
>> Manas
>> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[hidden email] <
>> mailto:[hidden email] <[hidden email]>>> wrote:
>>     Following up on Piotr's outline, there's an example in the
>>     documentation of how to use a KeyedProcessFunction to implement an
>>     event-time tumbling window [1]. Perhaps that can help you get started.
>>     Regards,
>>     David
>>     [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
>>     On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[hidden email]
>>     <mailto:[hidden email] <[hidden email]>>> wrote:
>>         Hi,
>>         I’m not sure, but I don’t think there is an existing window that
>>         would do exactly what you want. I would suggest to go back to
>>         the `keyedProcessFunction` (or a custom operator?), and have a
>>         MapState<TimeStamp, StateWithTimeStamp> currentStates field.
>>         Your key would be for example a timestamp of the beginning of
>>         your window. Value would be the latest state in this time
>>         window, annotated with a timestamp when this state was record.
>>         On each element:
>>         1. you determine the window’s begin ts (key of the map)
>>         2. If it’s first element, register an event time timer to
>>         publish results for that window’s end TS
>>         3. look into the `currentStates` if it should be modified (if
>>         your new element is newer or first value for the given key)
>>         On even time timer firing
>>         1. output the state matching to this timer
>>         2. Check if there is a (more recent) value for next window, and
>>         if not:
>>         3. copy the value to next window
>>         4. Register a timer for this window to fire
>>         5. Cleanup currentState and remove value for the no longed
>>         needed key.
>>         I hope this helps
>>         Piotrek
>>
>>         On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]
>>         <mailto:[hidden email] <[hidden email]>>> wrote:
>>
>>         Hi,
>>         I have an upstream operator that outputs device state
>>         transition messages with event timestamps. Meaning it only
>>         emits output when a transition takes place.
>>         For example,
>>         state1 @ 1 PM
>>         state2 @ 2 PM
>>         and so on.
>>
>>         *Using a downstream operator, I want to emit notification
>>         messages as per some configured periodicity.* For example, if
>>         periodicity = 20 min, in the above scenario this operator will
>>         output :
>>         state1 notification @ 1PM
>>         state1 notification @ 1.20PM
>>         state1 notification @ 1.40PM
>>          ...
>>
>>         *Now the main issue is that I want this to be driven by the
>>         /watermark /and not by transition events received from
>>         upstream. *Meaning I would like to see notification events as
>>         soon as the watermark crosses their timestamps; /not/ when the
>>         next transition event arrives at the operator (which could be
>>         hours later, as above).
>>
>>         My first solution, using a keyedProcessFunction and timers did
>>         not work as expected because the order in which transition
>>         events arrived at this operator was non-deterministic. To
>>         elaborate, assume a setAutoWatermarkInterval of 10 second.
>>         If we get transition events :
>>         state1 @ 1sec
>>         state2 @ 3 sec
>>         state3 @ 5 sec
>>         state1 @ 8 sec
>>         the order in which these events arrived at my
>>         keyedProcessFunction was not fixed. To solve this, these
>>         messages need to be sorted on event time, which led me to my
>>         second solution.
>>
>>         My second solution, using a EventTimeTumblingWindow with size
>>         = setAutoWatermarkInterval, also does not work. I sorted
>>         accumulated events in the window and applied
>>         notification-generation logic on them in order. However, I
>>         assumed that windows are created even if there are no
>>         elements. Since this is not the case, this solution generates
>>         notifications only when the next state tranisition message
>>         arrives, which could be hours later.
>>
>>         Does anyone have any suggestions on how I can implement this?
>>         Thanks!
>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: "Fill in" notification messages based on event time watermark

Manas Kale
Hi Aljoscha,
Thank you, that clarification helps. I am generating a new watermark in the getCurrentWatermark() method of my assigner, which causes the watermark to be actually updated every autoWatermark interval. I assumed that actual watermark updates were caused by only setAutoWatermark() method, which was incorrect. Your explanation makes it clear.
Note that I have canned this problem for now, and I'll send out a reply to this chain if I need help to solve it properly again. I don't want to waste anyone's time.

Thanks!

On Mon, May 18, 2020 at 7:59 PM Aljoscha Krettek <[hidden email]> wrote:
I think there is some confusion in this thread between the auto
watermark interval and the interval (length) of an event-time window.
Maybe clearing that up for everyone helps.

The auto watermark interval is the periodicity (in processing time) at
which Flink asks the source (or a watermark generator) what the current
watermark is. The source will keep track of the timestamps that it can
"respond" to Flink when it asks. For example, if the auto watermark
interval is set to 1 sec, Flink will update the watermark information
every second. This doesn't mean, though, that the watermark advances 1
sec in that time. If you're reading through some historic data the
watermark could jump by hours in between those 1 second intervals. You
can also think of this as the sampling interval for updating the current
watermark.

The window size size independent of the auto watermark interval, you can
have an arbitrary size here. The auto watermark interval only controls
how frequent Flink will check and emit the contents of windows, if their
end timestamp is below the watermark.

I hope that helps. If we're all clear we can look at the concrete
problem again.

Best,
Aljoscha

On 30.04.20 12:46, Manas Kale wrote:
> Hi Timo and Piotrek,
> Thank you for the suggestions.
> I have been trying to set up unit tests at the operator granularity, and
> the blog post's testHarness examples certainly help a lot in this regard.
>
> I understood my problem - an upstream session window operator can only
> report the end of the session window when the watermark has passed
> {lastObserverEvent + sessionTimeout}. However, my watermark was being
> updated periodically without taking this into account. It seems I will have
> to delay this notification operator's watermark by sessionTimeout.
> Another complication is that this sessionTimeout is per-key, so I guess I
> will have to implement a watermark assigner that extracts the delay period
> from data (similar to DynamicEventTimeWindows).
>
> Also, if I do implement such an assigner, would it be helpful to add it to
> Flink? I am happy to contribute if so. Any other comments/observations are
> also welcome!
>
> Thank you all for the help,
> Manas
>
>
> On Wed, Apr 29, 2020 at 3:39 PM Piotr Nowojski <[hidden email]> wrote:
>
>> Hi Manas,
>>
>> Adding to the response from Timo, if you don’t have unit tests/integration
>> tests, I would strongly recommend setting them up, as it makes debugging
>> and testing easier. You can read how to do it for your functions and
>> operators here [1] and here [2].
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>> [2]
>> https://flink.apache.org/news/2020/02/07/a-guide-for-unit-testing-in-apache-flink.html
>>
>> On 28 Apr 2020, at 18:45, Timo Walther <[hidden email]> wrote:
>>
>> Hi Manas,
>>
>> Reg. 1: I would recommend to use a debugger in your IDE and check which
>> watermarks are travelling through your operators.
>>
>> Reg. 2: All event-time operations are only performed once the watermark
>> arrived from all parallel instances. So roughly speaking, in machine time
>> you can assume that the window is computed in watermark update intervals.
>> However, "what is computed" depends on the timestamps of your events and
>> how those are categorized in windows.
>>
>> I hope this helps a bit.
>>
>> Regards,
>> Timo
>>
>> On 28.04.20 14:38, Manas Kale wrote:
>>
>> Hi David and Piotrek,
>> Thank you both for your inputs.
>> I tried an implementation with the algorithm Piotrek suggested and David's
>> example. Although notifications are being generated with the watermark,
>> subsequent transition events are being received after the watermark has
>> crossed their timestamps. For example:
>> state1 @ 100
>> notification state1@ 110
>> notification state1@ 120
>> notification state1@ 130    <----- shouldn't have emitted this
>> state2 @ 125                     <----- watermark is > 125 at this stage
>> I think something might be subtly(?) wrong with how I have structured
>> upstream operators. The allowed lateness is 0 in the watermarkassigner
>> upstream, and I generate watermarks every x seconds.
>> The operator that emits state transitions is constructed using the
>> TumblingWindow approach I described in the first e-mail (so that I can
>> compute at every watermark update). Note that I can use this approach for
>> state-transition-operator because it only wants to emit transitions, and
>> nothing in between.
>> So, two questions:
>> 1. Any idea on what might be causing this incorrect watermark behaviour?
>> 2. If I want to perform some computation only when the watermark updates,
>> is using a watermark-aligned EventTimeTumblingWindow (meaning
>> windowDuration = watermarkUpdateInterval) the correct way to do this?
>> Regards,
>> Manas
>> On Tue, Apr 28, 2020 at 2:16 AM David Anderson <[hidden email] <
>> mailto:[hidden email] <[hidden email]>>> wrote:
>>     Following up on Piotr's outline, there's an example in the
>>     documentation of how to use a KeyedProcessFunction to implement an
>>     event-time tumbling window [1]. Perhaps that can help you get started.
>>     Regards,
>>     David
>>     [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/tutorials/event_driven.html#example
>>     On Mon, Apr 27, 2020 at 7:47 PM Piotr Nowojski <[hidden email]
>>     <mailto:[hidden email] <[hidden email]>>> wrote:
>>         Hi,
>>         I’m not sure, but I don’t think there is an existing window that
>>         would do exactly what you want. I would suggest to go back to
>>         the `keyedProcessFunction` (or a custom operator?), and have a
>>         MapState<TimeStamp, StateWithTimeStamp> currentStates field.
>>         Your key would be for example a timestamp of the beginning of
>>         your window. Value would be the latest state in this time
>>         window, annotated with a timestamp when this state was record.
>>         On each element:
>>         1. you determine the window’s begin ts (key of the map)
>>         2. If it’s first element, register an event time timer to
>>         publish results for that window’s end TS
>>         3. look into the `currentStates` if it should be modified (if
>>         your new element is newer or first value for the given key)
>>         On even time timer firing
>>         1. output the state matching to this timer
>>         2. Check if there is a (more recent) value for next window, and
>>         if not:
>>         3. copy the value to next window
>>         4. Register a timer for this window to fire
>>         5. Cleanup currentState and remove value for the no longed
>>         needed key.
>>         I hope this helps
>>         Piotrek
>>
>>         On 27 Apr 2020, at 12:01, Manas Kale <[hidden email]
>>         <mailto:[hidden email] <[hidden email]>>> wrote:
>>
>>         Hi,
>>         I have an upstream operator that outputs device state
>>         transition messages with event timestamps. Meaning it only
>>         emits output when a transition takes place.
>>         For example,
>>         state1 @ 1 PM
>>         state2 @ 2 PM
>>         and so on.
>>
>>         *Using a downstream operator, I want to emit notification
>>         messages as per some configured periodicity.* For example, if
>>         periodicity = 20 min, in the above scenario this operator will
>>         output :
>>         state1 notification @ 1PM
>>         state1 notification @ 1.20PM
>>         state1 notification @ 1.40PM
>>          ...
>>
>>         *Now the main issue is that I want this to be driven by the
>>         /watermark /and not by transition events received from
>>         upstream. *Meaning I would like to see notification events as
>>         soon as the watermark crosses their timestamps; /not/ when the
>>         next transition event arrives at the operator (which could be
>>         hours later, as above).
>>
>>         My first solution, using a keyedProcessFunction and timers did
>>         not work as expected because the order in which transition
>>         events arrived at this operator was non-deterministic. To
>>         elaborate, assume a setAutoWatermarkInterval of 10 second.
>>         If we get transition events :
>>         state1 @ 1sec
>>         state2 @ 3 sec
>>         state3 @ 5 sec
>>         state1 @ 8 sec
>>         the order in which these events arrived at my
>>         keyedProcessFunction was not fixed. To solve this, these
>>         messages need to be sorted on event time, which led me to my
>>         second solution.
>>
>>         My second solution, using a EventTimeTumblingWindow with size
>>         = setAutoWatermarkInterval, also does not work. I sorted
>>         accumulated events in the window and applied
>>         notification-generation logic on them in order. However, I
>>         assumed that windows are created even if there are no
>>         elements. Since this is not the case, this solution generates
>>         notifications only when the next state tranisition message
>>         arrives, which could be hours later.
>>
>>         Does anyone have any suggestions on how I can implement this?
>>         Thanks!
>>
>>
>>
>