WindowOperator - element's timestamp

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

WindowOperator - element's timestamp

Petr Novotnik
Hello,

I'm struggling to understand the following behaviour of the
`WindowOperator` and would appreciate some insight from experts:

In particular I'm thinking about the following hypothetical data flow:

input.keyBy(..)
      .window(TumblingEventTimeWindows.of(..))
      .apply(..)
      ...
      .keyBy(..)
      .window(CustomWindowAssignerUsingATriggerBasedOnTheElementsStamp)
      .apply(..)

When the first window operator fires a window based on the timer, the
emitted elements are assigned a timestamp which equals
`window.maxTimestamp()`. This stamp is then available in the second
window operator's trigger through the `onElement` method. So far so good.

However, when using `ContinuousEventTimeTrigger` (simply put when firing
the window multiple times at different times in its lifecycle) in the
first window operator, _all_ of the elements of this window - no matter
whether fired as a partial or the final window result - will arrive with
the same stamp in the (downstream) operators.

This make it practically impossible to use again
`ContinuousEventTimeTrigger` (or similar) in the second window operator
to achieve "early firing" again.

This is surprising. I would expect the elements to be assigned the stamp
of the timer which fired them (which will be window#maxTimestamp() for
`TumblingEventTimeWindows`). Is there any particular reason for the
unconditional assignment to `window.maxTimestamp()`?

Many thanks in advance,
P.
Reply | Threaded
Open this post in threaded view
|

Re: WindowOperator - element's timestamp

Ufuk Celebi
Looping in Kostas and Aljoscha who should know what's the expected behaviour here ;)


On 11 November 2016 at 16:17:23, Petr Novotnik ([hidden email]) wrote:

> Hello,
>
> I'm struggling to understand the following behaviour of the
> `WindowOperator` and would appreciate some insight from experts:
>
> In particular I'm thinking about the following hypothetical data flow:
>
> input.keyBy(..)
> .window(TumblingEventTimeWindows.of(..))
> .apply(..)
> ...
> .keyBy(..)
> .window(CustomWindowAssignerUsingATriggerBasedOnTheElementsStamp)
> .apply(..)
>
> When the first window operator fires a window based on the timer, the
> emitted elements are assigned a timestamp which equals
> `window.maxTimestamp()`. This stamp is then available in the second
> window operator's trigger through the `onElement` method. So far so good.
>
> However, when using `ContinuousEventTimeTrigger` (simply put when firing
> the window multiple times at different times in its lifecycle) in the
> first window operator, _all_ of the elements of this window - no matter
> whether fired as a partial or the final window result - will arrive with
> the same stamp in the (downstream) operators.
>
> This make it practically impossible to use again
> `ContinuousEventTimeTrigger` (or similar) in the second window operator
> to achieve "early firing" again.
>
> This is surprising. I would expect the elements to be assigned the stamp
> of the timer which fired them (which will be window#maxTimestamp() for
> `TumblingEventTimeWindows`). Is there any particular reason for the
> unconditional assignment to `window.maxTimestamp()`?
>
> Many thanks in advance,
> P.
>

Reply | Threaded
Open this post in threaded view
|

Re: WindowOperator - element's timestamp

Aljoscha Krettek
Hi,
I'm afraid the ContinuousEventTimeTrigger is a bit misleading and should probably be removed. The problem is that a watermark T signals that we won't see elements with a timestamp < T in the future. It does not signal that we haven't already seen elements with a timestamp > T. So this cannot be used to trigger at different stages of a given window.

Do you have a concrete use case in mind for which you wanted to use ContinuousEventTimeTrigger?

Cheers,
Aljoscha

On Mon, 14 Nov 2016 at 09:58 Ufuk Celebi <[hidden email]> wrote:
Looping in Kostas and Aljoscha who should know what's the expected behaviour here ;)


On 11 November 2016 at 16:17:23, Petr Novotnik ([hidden email]) wrote:
> Hello,
>
> I'm struggling to understand the following behaviour of the
> `WindowOperator` and would appreciate some insight from experts:
>
> In particular I'm thinking about the following hypothetical data flow:
>
> input.keyBy(..)
> .window(TumblingEventTimeWindows.of(..))
> .apply(..)
> ...
> .keyBy(..)
> .window(CustomWindowAssignerUsingATriggerBasedOnTheElementsStamp)
> .apply(..)
>
> When the first window operator fires a window based on the timer, the
> emitted elements are assigned a timestamp which equals
> `window.maxTimestamp()`. This stamp is then available in the second
> window operator's trigger through the `onElement` method. So far so good.
>
> However, when using `ContinuousEventTimeTrigger` (simply put when firing
> the window multiple times at different times in its lifecycle) in the
> first window operator, _all_ of the elements of this window - no matter
> whether fired as a partial or the final window result - will arrive with
> the same stamp in the (downstream) operators.
>
> This make it practically impossible to use again
> `ContinuousEventTimeTrigger` (or similar) in the second window operator
> to achieve "early firing" again.
>
> This is surprising. I would expect the elements to be assigned the stamp
> of the timer which fired them (which will be window#maxTimestamp() for
> `TumblingEventTimeWindows`). Is there any particular reason for the
> unconditional assignment to `window.maxTimestamp()`?
>
> Many thanks in advance,
> P.
>

Reply | Threaded
Open this post in threaded view
|

Re: WindowOperator - element's timestamp

Petr Novotnik
Aljoscha,

thanks for your response. The use-case I'm after is basically providing
"early" (inaccurate) results to downstream consumers. Suppose we're
running aggregations for daily time windows, but we don't want to wait a
whole day to see results. The idea is to fire the windows continuously
before they hit their end of life (at which point they fill be
fired_and_purged and will provide the final, accurate answer.)

However, if all of these "early" fired panes emit elements with a
timestamp equaling the end-of-the-window, stateful downstream operators
a) have no chance distinguishing between the different panes of the same
window b) and don't have any chance to set-up timers before the
watermark at the downstream operator advances to the "end of the day".

Hope this clarifies my motivation a bit,
P.

On 11/14/2016 03:22 PM, Aljoscha Krettek wrote:

> Hi,
> I'm afraid the ContinuousEventTimeTrigger is a bit misleading and should
> probably be removed. The problem is that a watermark T signals that we
> won't see elements with a timestamp < T in the future. It does not
> signal that we haven't already seen elements with a timestamp > T. So
> this cannot be used to trigger at different stages of a given window.
>
> Do you have a concrete use case in mind for which you wanted to use
> ContinuousEventTimeTrigger?
>
> Cheers,
> Aljoscha
>
> On Mon, 14 Nov 2016 at 09:58 Ufuk Celebi <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Looping in Kostas and Aljoscha who should know what's the expected
>     behaviour here ;)
>
>
>     On 11 November 2016 at 16:17:23, Petr Novotnik
>     ([hidden email]
>     <mailto:[hidden email]>) wrote:
>     > Hello,
>     >
>     > I'm struggling to understand the following behaviour of the
>     > `WindowOperator` and would appreciate some insight from experts:
>     >
>     > In particular I'm thinking about the following hypothetical data flow:
>     >
>     > input.keyBy(..)
>     > .window(TumblingEventTimeWindows.of(..))
>     > .apply(..)
>     > ...
>     > .keyBy(..)
>     > .window(CustomWindowAssignerUsingATriggerBasedOnTheElementsStamp)
>     > .apply(..)
>     >
>     > When the first window operator fires a window based on the timer, the
>     > emitted elements are assigned a timestamp which equals
>     > `window.maxTimestamp()`. This stamp is then available in the second
>     > window operator's trigger through the `onElement` method. So far
>     so good.
>     >
>     > However, when using `ContinuousEventTimeTrigger` (simply put when
>     firing
>     > the window multiple times at different times in its lifecycle) in the
>     > first window operator, _all_ of the elements of this window - no
>     matter
>     > whether fired as a partial or the final window result - will
>     arrive with
>     > the same stamp in the (downstream) operators.
>     >
>     > This make it practically impossible to use again
>     > `ContinuousEventTimeTrigger` (or similar) in the second window
>     operator
>     > to achieve "early firing" again.
>     >
>     > This is surprising. I would expect the elements to be assigned the
>     stamp
>     > of the timer which fired them (which will be window#maxTimestamp() for
>     > `TumblingEventTimeWindows`). Is there any particular reason for the
>     > unconditional assignment to `window.maxTimestamp()`?
>     >
>     > Many thanks in advance,
>     > P.
>     >
>
Reply | Threaded
Open this post in threaded view
|

Re: WindowOperator - element's timestamp

Aljoscha Krettek
Hi,
I understand now. For early (speculative) firing I would suggest to write a custom trigger that repeatedly fires on processing time. We're also working on a Trigger DSL that will make such cases simpler, for example, you would be able to write:
  window.trigger(EventTime.pastEndOfWindow().withEarlyFiring(ProcessingTime.after(Time.minutes(5))))

We're also working on this https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata to give some more metadata for window processing. What might be interesting for you is the firing reason and maybe the firing counter.

Cheers,
Aljoscha

On Mon, 14 Nov 2016 at 21:54 Petr Novotnik <[hidden email]> wrote:
Aljoscha,

thanks for your response. The use-case I'm after is basically providing
"early" (inaccurate) results to downstream consumers. Suppose we're
running aggregations for daily time windows, but we don't want to wait a
whole day to see results. The idea is to fire the windows continuously
before they hit their end of life (at which point they fill be
fired_and_purged and will provide the final, accurate answer.)

However, if all of these "early" fired panes emit elements with a
timestamp equaling the end-of-the-window, stateful downstream operators
a) have no chance distinguishing between the different panes of the same
window b) and don't have any chance to set-up timers before the
watermark at the downstream operator advances to the "end of the day".

Hope this clarifies my motivation a bit,
P.

On 11/14/2016 03:22 PM, Aljoscha Krettek wrote:
> Hi,
> I'm afraid the ContinuousEventTimeTrigger is a bit misleading and should
> probably be removed. The problem is that a watermark T signals that we
> won't see elements with a timestamp < T in the future. It does not
> signal that we haven't already seen elements with a timestamp > T. So
> this cannot be used to trigger at different stages of a given window.
>
> Do you have a concrete use case in mind for which you wanted to use
> ContinuousEventTimeTrigger?
>
> Cheers,
> Aljoscha
>
> On Mon, 14 Nov 2016 at 09:58 Ufuk Celebi <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     Looping in Kostas and Aljoscha who should know what's the expected
>     behaviour here ;)
>
>
>     On 11 November 2016 at 16:17:23, Petr Novotnik
>     ([hidden email]
>     <mailto:[hidden email]>) wrote:
>     > Hello,
>     >
>     > I'm struggling to understand the following behaviour of the
>     > `WindowOperator` and would appreciate some insight from experts:
>     >
>     > In particular I'm thinking about the following hypothetical data flow:
>     >
>     > input.keyBy(..)
>     > .window(TumblingEventTimeWindows.of(..))
>     > .apply(..)
>     > ...
>     > .keyBy(..)
>     > .window(CustomWindowAssignerUsingATriggerBasedOnTheElementsStamp)
>     > .apply(..)
>     >
>     > When the first window operator fires a window based on the timer, the
>     > emitted elements are assigned a timestamp which equals
>     > `window.maxTimestamp()`. This stamp is then available in the second
>     > window operator's trigger through the `onElement` method. So far
>     so good.
>     >
>     > However, when using `ContinuousEventTimeTrigger` (simply put when
>     firing
>     > the window multiple times at different times in its lifecycle) in the
>     > first window operator, _all_ of the elements of this window - no
>     matter
>     > whether fired as a partial or the final window result - will
>     arrive with
>     > the same stamp in the (downstream) operators.
>     >
>     > This make it practically impossible to use again
>     > `ContinuousEventTimeTrigger` (or similar) in the second window
>     operator
>     > to achieve "early firing" again.
>     >
>     > This is surprising. I would expect the elements to be assigned the
>     stamp
>     > of the timer which fired them (which will be window#maxTimestamp() for
>     > `TumblingEventTimeWindows`). Is there any particular reason for the
>     > unconditional assignment to `window.maxTimestamp()`?
>     >
>     > Many thanks in advance,
>     > P.
>     >
>