[DISCUSS] Allowed Lateness in Flink

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek
Hi Folks,
as part of my effort to improve the windowing in Flink [1] I also thought about lateness, accumulating/discarding and window cleanup. I have some ideas on this but I would love to get feedback from the community as I think that these things are important for everyone doing event-time windowing on Flink.

The basic problem is this: Some elements can arrive behind the watermark if the watermark is not 100 % correct (which it is not, in most cases, I would assume). We need to provide API that allows to specify what happens when these late elements arrive. There are two main knobs for the user here:

- Allowed Lateness: How late can an element be before it is completely ignored, i.e. simply discarded

- Accumulating/Discarding Fired Windows: When we fire a window, do we purge the contents or do we keep it around until the watermark passes the end of end window plus the allowed lateness? If we keep the window a late element will be added to the window and the window will be emitted again. If don't keep the window then the late element will essentially trigger emission of a one-element window.

This is somewhat straightforward to implement: If accumulating set a timer for the end of the window plus the allowed lateness. Cleanup the window when that fires (basically). All in event-time with watermarks.

 My problem is only this: what should happen if the user specifies some allowed lateness and/or accumulating mode but uses processing-time windowing. For processing-time windows these don't make sense because elements cannot can be late by definition. The problem is, that we cannot figure out, by looking at a WindowAssigner or the Windows that it assigns to elements whether these windows are in event-time or processing-time domain. At the API level this is also not easily visible, since a user might have set the "stream-time-characteristic" to event-time but still use a processing-time window (plus trigger) in the program.

Any ideas for solving this are extremely welcome. :-)

Cheers,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek
By the way. The way I see to fixing this is extending WindowAssigner with an "isEventTime()" method and then allow accumulating/lateness in the WindowOperator only if this is true.

But it seems a but hacky because it special cases event-time. But then again, maybe we need to special case it ... 

On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]> wrote:
Hi Folks,
as part of my effort to improve the windowing in Flink [1] I also thought about lateness, accumulating/discarding and window cleanup. I have some ideas on this but I would love to get feedback from the community as I think that these things are important for everyone doing event-time windowing on Flink.

The basic problem is this: Some elements can arrive behind the watermark if the watermark is not 100 % correct (which it is not, in most cases, I would assume). We need to provide API that allows to specify what happens when these late elements arrive. There are two main knobs for the user here:

- Allowed Lateness: How late can an element be before it is completely ignored, i.e. simply discarded

- Accumulating/Discarding Fired Windows: When we fire a window, do we purge the contents or do we keep it around until the watermark passes the end of end window plus the allowed lateness? If we keep the window a late element will be added to the window and the window will be emitted again. If don't keep the window then the late element will essentially trigger emission of a one-element window.

This is somewhat straightforward to implement: If accumulating set a timer for the end of the window plus the allowed lateness. Cleanup the window when that fires (basically). All in event-time with watermarks.

 My problem is only this: what should happen if the user specifies some allowed lateness and/or accumulating mode but uses processing-time windowing. For processing-time windows these don't make sense because elements cannot can be late by definition. The problem is, that we cannot figure out, by looking at a WindowAssigner or the Windows that it assigns to elements whether these windows are in event-time or processing-time domain. At the API level this is also not easily visible, since a user might have set the "stream-time-characteristic" to event-time but still use a processing-time window (plus trigger) in the program.

Any ideas for solving this are extremely welcome. :-)

Cheers,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Maximilian Michels
Hi Aljoscha,

Thank you for the detailed design document.

Wouldn't it be ok to allow these new concepts regardless of the time
semantics? For Event Time and Ingestion Time "Lateness" and
"Accumulating/Discarding" make sense. If the user chooses Processing
time then these can be ignored during translation of the StreamGraph
(possibly with a warning).

Detecting when these concepts make sense should be possible by
checking the "Stream Charateristics" of the ExecutionEnvironment or
the involved classes (e.g. SlidingProcessingTimeWindows) in the
StreamGraph. If the users uses a custom WindowAssigner then the user
has to take care that it is used correctly. I don't like the
"isEventTime()" method. Even with the additional method, users could
return 'true' there although they meant 'false', right? So this does
not really solve the problem that it is hard to distinguish Event Time
and Processing Time semantics in Flink.

Another approach that I could think of is getting rid of
'System.currentTimeMillis()' and only allow to get time via a special
interface that WindowAssigners implement. Then we could determine what
time is assigned and also verify that it is actually used (in contrast
to the isEventTime() method). Would that be an option or would it
break the API?

Cheers,
Max

On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]> wrote:

> By the way. The way I see to fixing this is extending WindowAssigner with
> an "isEventTime()" method and then allow accumulating/lateness in the
> WindowOperator only if this is true.
>
> But it seems a but hacky because it special cases event-time. But then
> again, maybe we need to special case it ...
>
> On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Folks,
>> as part of my effort to improve the windowing in Flink [1] I also thought
>> about lateness, accumulating/discarding and window cleanup. I have some
>> ideas on this but I would love to get feedback from the community as I
>> think that these things are important for everyone doing event-time
>> windowing on Flink.
>>
>> The basic problem is this: Some elements can arrive behind the watermark
>> if the watermark is not 100 % correct (which it is not, in most cases, I
>> would assume). We need to provide API that allows to specify what happens
>> when these late elements arrive. There are two main knobs for the user here:
>>
>> - Allowed Lateness: How late can an element be before it is completely
>> ignored, i.e. simply discarded
>>
>> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>> purge the contents or do we keep it around until the watermark passes the
>> end of end window plus the allowed lateness? If we keep the window a late
>> element will be added to the window and the window will be emitted again.
>> If don't keep the window then the late element will essentially trigger
>> emission of a one-element window.
>>
>> This is somewhat straightforward to implement: If accumulating set a timer
>> for the end of the window plus the allowed lateness. Cleanup the window
>> when that fires (basically). All in event-time with watermarks.
>>
>>  My problem is only this: what should happen if the user specifies some
>> allowed lateness and/or accumulating mode but uses processing-time
>> windowing. For processing-time windows these don't make sense because
>> elements cannot can be late by definition. The problem is, that we cannot
>> figure out, by looking at a WindowAssigner or the Windows that it assigns
>> to elements whether these windows are in event-time or processing-time
>> domain. At the API level this is also not easily visible, since a user
>> might have set the "stream-time-characteristic" to event-time but still use
>> a processing-time window (plus trigger) in the program.
>>
>> Any ideas for solving this are extremely welcome. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> [1]
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek
Hi Max,
thanks for the Feedback and suggestions! I'll try and address each paragraph separately.

I'm afraid deciding based on the "StreamTimeCharacteristic is not possible since a user can use processing-time windows in their job even though the set the characteristic to event-time. Enabling event time does not disable processing time, it just enables an additional feature. (IMHO, the handling of the StreamTimeCharacteristic is still somewhat problematic.)

Making the decision based purely on the class of the WindowAssigner is also not possible since we don't know in advance which WindowAssigners the users will write and what time characteristic they will use.

Regarding the third proposition. Removing 'System.currentTimeMillis()' is very desirable and part of my proposal. However, it is still meant as being separate from "event-time" since a Trigger/WindowAssigner might need both. For example, a Trigger might want to do early triggering a few (processing-time) seconds after the first elements arrive and proper triggering once the watermark for the end of the window arrives.

These are good ideas but I'm afraid we still don't have a good solution. This whole processing time/event time business is just very tricky.

Cheers,
Aljoscha

On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
Hi Aljoscha,

Thank you for the detailed design document.

Wouldn't it be ok to allow these new concepts regardless of the time
semantics? For Event Time and Ingestion Time "Lateness" and
"Accumulating/Discarding" make sense. If the user chooses Processing
time then these can be ignored during translation of the StreamGraph
(possibly with a warning).

Detecting when these concepts make sense should be possible by
checking the "Stream Charateristics" of the ExecutionEnvironment or
the involved classes (e.g. SlidingProcessingTimeWindows) in the
StreamGraph. If the users uses a custom WindowAssigner then the user
has to take care that it is used correctly. I don't like the
"isEventTime()" method. Even with the additional method, users could
return 'true' there although they meant 'false', right? So this does
not really solve the problem that it is hard to distinguish Event Time
and Processing Time semantics in Flink.

Another approach that I could think of is getting rid of
'System.currentTimeMillis()' and only allow to get time via a special
interface that WindowAssigners implement. Then we could determine what
time is assigned and also verify that it is actually used (in contrast
to the isEventTime() method). Would that be an option or would it
break the API?

Cheers,
Max

On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]> wrote:
> By the way. The way I see to fixing this is extending WindowAssigner with
> an "isEventTime()" method and then allow accumulating/lateness in the
> WindowOperator only if this is true.
>
> But it seems a but hacky because it special cases event-time. But then
> again, maybe we need to special case it ...
>
> On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Folks,
>> as part of my effort to improve the windowing in Flink [1] I also thought
>> about lateness, accumulating/discarding and window cleanup. I have some
>> ideas on this but I would love to get feedback from the community as I
>> think that these things are important for everyone doing event-time
>> windowing on Flink.
>>
>> The basic problem is this: Some elements can arrive behind the watermark
>> if the watermark is not 100 % correct (which it is not, in most cases, I
>> would assume). We need to provide API that allows to specify what happens
>> when these late elements arrive. There are two main knobs for the user here:
>>
>> - Allowed Lateness: How late can an element be before it is completely
>> ignored, i.e. simply discarded
>>
>> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>> purge the contents or do we keep it around until the watermark passes the
>> end of end window plus the allowed lateness? If we keep the window a late
>> element will be added to the window and the window will be emitted again.
>> If don't keep the window then the late element will essentially trigger
>> emission of a one-element window.
>>
>> This is somewhat straightforward to implement: If accumulating set a timer
>> for the end of the window plus the allowed lateness. Cleanup the window
>> when that fires (basically). All in event-time with watermarks.
>>
>>  My problem is only this: what should happen if the user specifies some
>> allowed lateness and/or accumulating mode but uses processing-time
>> windowing. For processing-time windows these don't make sense because
>> elements cannot can be late by definition. The problem is, that we cannot
>> figure out, by looking at a WindowAssigner or the Windows that it assigns
>> to elements whether these windows are in event-time or processing-time
>> domain. At the API level this is also not easily visible, since a user
>> might have set the "stream-time-characteristic" to event-time but still use
>> a processing-time window (plus trigger) in the program.
>>
>> Any ideas for solving this are extremely welcome. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> [1]
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek
Hi,
I created a new doc specifically about the interplay of lateness and window state garbage collection: https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing

There is still some stuff that needs to be figured out, both in the new doc and the existing doc. For example, we need to decide whether to make accumulating/discarding behavior global for a window operation or controllable by triggers. Initially, I suggested to make accumulating/discarding a global setting for the window operation because we can get away with keeping less state if we know that we always discard when firing. Please take a look at the new doc to see what I'm talking about there.

Feedback very welcome!

Cheers,
Aljoscha

On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]> wrote:
Hi Max,
thanks for the Feedback and suggestions! I'll try and address each paragraph separately.

I'm afraid deciding based on the "StreamTimeCharacteristic is not possible since a user can use processing-time windows in their job even though the set the characteristic to event-time. Enabling event time does not disable processing time, it just enables an additional feature. (IMHO, the handling of the StreamTimeCharacteristic is still somewhat problematic.)

Making the decision based purely on the class of the WindowAssigner is also not possible since we don't know in advance which WindowAssigners the users will write and what time characteristic they will use.

Regarding the third proposition. Removing 'System.currentTimeMillis()' is very desirable and part of my proposal. However, it is still meant as being separate from "event-time" since a Trigger/WindowAssigner might need both. For example, a Trigger might want to do early triggering a few (processing-time) seconds after the first elements arrive and proper triggering once the watermark for the end of the window arrives.

These are good ideas but I'm afraid we still don't have a good solution. This whole processing time/event time business is just very tricky.

Cheers,
Aljoscha

On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
Hi Aljoscha,

Thank you for the detailed design document.

Wouldn't it be ok to allow these new concepts regardless of the time
semantics? For Event Time and Ingestion Time "Lateness" and
"Accumulating/Discarding" make sense. If the user chooses Processing
time then these can be ignored during translation of the StreamGraph
(possibly with a warning).

Detecting when these concepts make sense should be possible by
checking the "Stream Charateristics" of the ExecutionEnvironment or
the involved classes (e.g. SlidingProcessingTimeWindows) in the
StreamGraph. If the users uses a custom WindowAssigner then the user
has to take care that it is used correctly. I don't like the
"isEventTime()" method. Even with the additional method, users could
return 'true' there although they meant 'false', right? So this does
not really solve the problem that it is hard to distinguish Event Time
and Processing Time semantics in Flink.

Another approach that I could think of is getting rid of
'System.currentTimeMillis()' and only allow to get time via a special
interface that WindowAssigners implement. Then we could determine what
time is assigned and also verify that it is actually used (in contrast
to the isEventTime() method). Would that be an option or would it
break the API?

Cheers,
Max

On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]> wrote:
> By the way. The way I see to fixing this is extending WindowAssigner with
> an "isEventTime()" method and then allow accumulating/lateness in the
> WindowOperator only if this is true.
>
> But it seems a but hacky because it special cases event-time. But then
> again, maybe we need to special case it ...
>
> On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Folks,
>> as part of my effort to improve the windowing in Flink [1] I also thought
>> about lateness, accumulating/discarding and window cleanup. I have some
>> ideas on this but I would love to get feedback from the community as I
>> think that these things are important for everyone doing event-time
>> windowing on Flink.
>>
>> The basic problem is this: Some elements can arrive behind the watermark
>> if the watermark is not 100 % correct (which it is not, in most cases, I
>> would assume). We need to provide API that allows to specify what happens
>> when these late elements arrive. There are two main knobs for the user here:
>>
>> - Allowed Lateness: How late can an element be before it is completely
>> ignored, i.e. simply discarded
>>
>> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>> purge the contents or do we keep it around until the watermark passes the
>> end of end window plus the allowed lateness? If we keep the window a late
>> element will be added to the window and the window will be emitted again.
>> If don't keep the window then the late element will essentially trigger
>> emission of a one-element window.
>>
>> This is somewhat straightforward to implement: If accumulating set a timer
>> for the end of the window plus the allowed lateness. Cleanup the window
>> when that fires (basically). All in event-time with watermarks.
>>
>>  My problem is only this: what should happen if the user specifies some
>> allowed lateness and/or accumulating mode but uses processing-time
>> windowing. For processing-time windows these don't make sense because
>> elements cannot can be late by definition. The problem is, that we cannot
>> figure out, by looking at a WindowAssigner or the Windows that it assigns
>> to elements whether these windows are in event-time or processing-time
>> domain. At the API level this is also not easily visible, since a user
>> might have set the "stream-time-characteristic" to event-time but still use
>> a processing-time window (plus trigger) in the program.
>>
>> Any ideas for solving this are extremely welcome. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> [1]
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Gyula Fóra
Thanks Aljoscha :) I added some comments that might seem relevant from the users point of view.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. máj. 30., H, 10:33):
Hi,
I created a new doc specifically about the interplay of lateness and window state garbage collection: https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing

There is still some stuff that needs to be figured out, both in the new doc and the existing doc. For example, we need to decide whether to make accumulating/discarding behavior global for a window operation or controllable by triggers. Initially, I suggested to make accumulating/discarding a global setting for the window operation because we can get away with keeping less state if we know that we always discard when firing. Please take a look at the new doc to see what I'm talking about there.

Feedback very welcome!

Cheers,
Aljoscha

On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]> wrote:
Hi Max,
thanks for the Feedback and suggestions! I'll try and address each paragraph separately.

I'm afraid deciding based on the "StreamTimeCharacteristic is not possible since a user can use processing-time windows in their job even though the set the characteristic to event-time. Enabling event time does not disable processing time, it just enables an additional feature. (IMHO, the handling of the StreamTimeCharacteristic is still somewhat problematic.)

Making the decision based purely on the class of the WindowAssigner is also not possible since we don't know in advance which WindowAssigners the users will write and what time characteristic they will use.

Regarding the third proposition. Removing 'System.currentTimeMillis()' is very desirable and part of my proposal. However, it is still meant as being separate from "event-time" since a Trigger/WindowAssigner might need both. For example, a Trigger might want to do early triggering a few (processing-time) seconds after the first elements arrive and proper triggering once the watermark for the end of the window arrives.

These are good ideas but I'm afraid we still don't have a good solution. This whole processing time/event time business is just very tricky.

Cheers,
Aljoscha

On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
Hi Aljoscha,

Thank you for the detailed design document.

Wouldn't it be ok to allow these new concepts regardless of the time
semantics? For Event Time and Ingestion Time "Lateness" and
"Accumulating/Discarding" make sense. If the user chooses Processing
time then these can be ignored during translation of the StreamGraph
(possibly with a warning).

Detecting when these concepts make sense should be possible by
checking the "Stream Charateristics" of the ExecutionEnvironment or
the involved classes (e.g. SlidingProcessingTimeWindows) in the
StreamGraph. If the users uses a custom WindowAssigner then the user
has to take care that it is used correctly. I don't like the
"isEventTime()" method. Even with the additional method, users could
return 'true' there although they meant 'false', right? So this does
not really solve the problem that it is hard to distinguish Event Time
and Processing Time semantics in Flink.

Another approach that I could think of is getting rid of
'System.currentTimeMillis()' and only allow to get time via a special
interface that WindowAssigners implement. Then we could determine what
time is assigned and also verify that it is actually used (in contrast
to the isEventTime() method). Would that be an option or would it
break the API?

Cheers,
Max

On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]> wrote:
> By the way. The way I see to fixing this is extending WindowAssigner with
> an "isEventTime()" method and then allow accumulating/lateness in the
> WindowOperator only if this is true.
>
> But it seems a but hacky because it special cases event-time. But then
> again, maybe we need to special case it ...
>
> On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Folks,
>> as part of my effort to improve the windowing in Flink [1] I also thought
>> about lateness, accumulating/discarding and window cleanup. I have some
>> ideas on this but I would love to get feedback from the community as I
>> think that these things are important for everyone doing event-time
>> windowing on Flink.
>>
>> The basic problem is this: Some elements can arrive behind the watermark
>> if the watermark is not 100 % correct (which it is not, in most cases, I
>> would assume). We need to provide API that allows to specify what happens
>> when these late elements arrive. There are two main knobs for the user here:
>>
>> - Allowed Lateness: How late can an element be before it is completely
>> ignored, i.e. simply discarded
>>
>> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>> purge the contents or do we keep it around until the watermark passes the
>> end of end window plus the allowed lateness? If we keep the window a late
>> element will be added to the window and the window will be emitted again.
>> If don't keep the window then the late element will essentially trigger
>> emission of a one-element window.
>>
>> This is somewhat straightforward to implement: If accumulating set a timer
>> for the end of the window plus the allowed lateness. Cleanup the window
>> when that fires (basically). All in event-time with watermarks.
>>
>>  My problem is only this: what should happen if the user specifies some
>> allowed lateness and/or accumulating mode but uses processing-time
>> windowing. For processing-time windows these don't make sense because
>> elements cannot can be late by definition. The problem is, that we cannot
>> figure out, by looking at a WindowAssigner or the Windows that it assigns
>> to elements whether these windows are in event-time or processing-time
>> domain. At the API level this is also not easily visible, since a user
>> might have set the "stream-time-characteristic" to event-time but still use
>> a processing-time window (plus trigger) in the program.
>>
>> Any ideas for solving this are extremely welcome. :-)
>>
>> Cheers,
>> Aljoscha
>>
>> [1]
>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek
Thanks for the feedback! :-) I already read the comments on the file.

On Mon, 30 May 2016 at 11:10 Gyula Fóra <[hidden email]> wrote:
Thanks Aljoscha :) I added some comments that might seem relevant from the
users point of view.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. máj. 30.,
H, 10:33):

> Hi,
> I created a new doc specifically about the interplay of lateness and
> window state garbage collection:
> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
>
> There is still some stuff that needs to be figured out, both in the new
> doc and the existing doc. For example, we need to decide whether to make
> accumulating/discarding behavior global for a window operation or
> controllable by triggers. Initially, I suggested to make
> accumulating/discarding a global setting for the window operation because
> we can get away with keeping less state if we know that we always discard
> when firing. Please take a look at the new doc to see what I'm talking
> about there.
>
> Feedback very welcome!
>
> Cheers,
> Aljoscha
>
> On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Max,
>> thanks for the Feedback and suggestions! I'll try and address each
>> paragraph separately.
>>
>> I'm afraid deciding based on the "StreamTimeCharacteristic is not
>> possible since a user can use processing-time windows in their job even
>> though the set the characteristic to event-time. Enabling event time does
>> not disable processing time, it just enables an additional feature. (IMHO,
>> the handling of the StreamTimeCharacteristic is still somewhat problematic.)
>>
>> Making the decision based purely on the class of the WindowAssigner is
>> also not possible since we don't know in advance which WindowAssigners the
>> users will write and what time characteristic they will use.
>>
>> Regarding the third proposition. Removing 'System.currentTimeMillis()' is
>> very desirable and part of my proposal. However, it is still meant as being
>> separate from "event-time" since a Trigger/WindowAssigner might need both.
>> For example, a Trigger might want to do early triggering a few
>> (processing-time) seconds after the first elements arrive and proper
>> triggering once the watermark for the end of the window arrives.
>>
>> These are good ideas but I'm afraid we still don't have a good solution.
>> This whole processing time/event time business is just very tricky.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thank you for the detailed design document.
>>>
>>> Wouldn't it be ok to allow these new concepts regardless of the time
>>> semantics? For Event Time and Ingestion Time "Lateness" and
>>> "Accumulating/Discarding" make sense. If the user chooses Processing
>>> time then these can be ignored during translation of the StreamGraph
>>> (possibly with a warning).
>>>
>>> Detecting when these concepts make sense should be possible by
>>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
>>> StreamGraph. If the users uses a custom WindowAssigner then the user
>>> has to take care that it is used correctly. I don't like the
>>> "isEventTime()" method. Even with the additional method, users could
>>> return 'true' there although they meant 'false', right? So this does
>>> not really solve the problem that it is hard to distinguish Event Time
>>> and Processing Time semantics in Flink.
>>>
>>> Another approach that I could think of is getting rid of
>>> 'System.currentTimeMillis()' and only allow to get time via a special
>>> interface that WindowAssigners implement. Then we could determine what
>>> time is assigned and also verify that it is actually used (in contrast
>>> to the isEventTime() method). Would that be an option or would it
>>> break the API?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> > By the way. The way I see to fixing this is extending WindowAssigner
>>> with
>>> > an "isEventTime()" method and then allow accumulating/lateness in the
>>> > WindowOperator only if this is true.
>>> >
>>> > But it seems a but hacky because it special cases event-time. But then
>>> > again, maybe we need to special case it ...
>>> >
>>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> >
>>> >> Hi Folks,
>>> >> as part of my effort to improve the windowing in Flink [1] I also
>>> thought
>>> >> about lateness, accumulating/discarding and window cleanup. I have
>>> some
>>> >> ideas on this but I would love to get feedback from the community as I
>>> >> think that these things are important for everyone doing event-time
>>> >> windowing on Flink.
>>> >>
>>> >> The basic problem is this: Some elements can arrive behind the
>>> watermark
>>> >> if the watermark is not 100 % correct (which it is not, in most
>>> cases, I
>>> >> would assume). We need to provide API that allows to specify what
>>> happens
>>> >> when these late elements arrive. There are two main knobs for the
>>> user here:
>>> >>
>>> >> - Allowed Lateness: How late can an element be before it is completely
>>> >> ignored, i.e. simply discarded
>>> >>
>>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>>> >> purge the contents or do we keep it around until the watermark passes
>>> the
>>> >> end of end window plus the allowed lateness? If we keep the window a
>>> late
>>> >> element will be added to the window and the window will be emitted
>>> again.
>>> >> If don't keep the window then the late element will essentially
>>> trigger
>>> >> emission of a one-element window.
>>> >>
>>> >> This is somewhat straightforward to implement: If accumulating set a
>>> timer
>>> >> for the end of the window plus the allowed lateness. Cleanup the
>>> window
>>> >> when that fires (basically). All in event-time with watermarks.
>>> >>
>>> >>  My problem is only this: what should happen if the user specifies
>>> some
>>> >> allowed lateness and/or accumulating mode but uses processing-time
>>> >> windowing. For processing-time windows these don't make sense because
>>> >> elements cannot can be late by definition. The problem is, that we
>>> cannot
>>> >> figure out, by looking at a WindowAssigner or the Windows that it
>>> assigns
>>> >> to elements whether these windows are in event-time or processing-time
>>> >> domain. At the API level this is also not easily visible, since a user
>>> >> might have set the "stream-time-characteristic" to event-time but
>>> still use
>>> >> a processing-time window (plus trigger) in the program.
>>> >>
>>> >> Any ideas for solving this are extremely welcome. :-)
>>> >>
>>> >> Cheers,
>>> >> Aljoscha
>>> >>
>>> >> [1]
>>> >>
>>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>> >>
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Allowed Lateness in Flink

Aljoscha Krettek
Hi,
I cleaned up the document a bit and added sections to address comments on the doc:
https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing (I also marked proposed features that are already implemented as [done].)

The main thing that remains to be figured out is how we deal with purging, i.e. whether the trigger can decide to purge a window or whether the WindowOperator should do this and also what happens when window state is garbage collected. The original proposal was to reduce the current set of trigger results from (CONTINUE, FIRE, PURGE, FIRE_AND_PURGE) to (CONTINUE, FIRE) and have a global flag in the WindowOperator that says whether firing windows should be purged (DISCARDING) or kept for a bit, until the allowed lateness expires (ACCUMULATING). Based on comments by Elias I added a section that sketches an alternative where the triggers are in charge of purging and also decide what should happen in case of window cleanup.

One thing we should also keep in mind is how we can make the windowing API easy to use for people that don't need all the bells and whistles of custom triggers, allowed lateness and so on. This is partially covered by the proposal to add composite triggers but I feel we can go further there.

In the future, it might be good to to discussions directly on the ML and then change the document accordingly. This way everyone can follow the discussion on the ML. I also feel that Google Doc comments often don't give enough space for expressing more complex opinions.

Cheers,
Aljoscha


On Mon, 30 May 2016 at 11:23 Aljoscha Krettek <[hidden email]> wrote:
Thanks for the feedback! :-) I already read the comments on the file.

On Mon, 30 May 2016 at 11:10 Gyula Fóra <[hidden email]> wrote:
Thanks Aljoscha :) I added some comments that might seem relevant from the
users point of view.

Gyula

Aljoscha Krettek <[hidden email]> ezt írta (időpont: 2016. máj. 30.,
H, 10:33):

> Hi,
> I created a new doc specifically about the interplay of lateness and
> window state garbage collection:
> https://docs.google.com/document/d/1vgukdDiUco0KX4f7tlDJgHWaRVIU-KorItWgnBapq_8/edit?usp=sharing
>
> There is still some stuff that needs to be figured out, both in the new
> doc and the existing doc. For example, we need to decide whether to make
> accumulating/discarding behavior global for a window operation or
> controllable by triggers. Initially, I suggested to make
> accumulating/discarding a global setting for the window operation because
> we can get away with keeping less state if we know that we always discard
> when firing. Please take a look at the new doc to see what I'm talking
> about there.
>
> Feedback very welcome!
>
> Cheers,
> Aljoscha
>
> On Tue, 26 Apr 2016 at 16:45 Aljoscha Krettek <[hidden email]> wrote:
>
>> Hi Max,
>> thanks for the Feedback and suggestions! I'll try and address each
>> paragraph separately.
>>
>> I'm afraid deciding based on the "StreamTimeCharacteristic is not
>> possible since a user can use processing-time windows in their job even
>> though the set the characteristic to event-time. Enabling event time does
>> not disable processing time, it just enables an additional feature. (IMHO,
>> the handling of the StreamTimeCharacteristic is still somewhat problematic.)
>>
>> Making the decision based purely on the class of the WindowAssigner is
>> also not possible since we don't know in advance which WindowAssigners the
>> users will write and what time characteristic they will use.
>>
>> Regarding the third proposition. Removing 'System.currentTimeMillis()' is
>> very desirable and part of my proposal. However, it is still meant as being
>> separate from "event-time" since a Trigger/WindowAssigner might need both.
>> For example, a Trigger might want to do early triggering a few
>> (processing-time) seconds after the first elements arrive and proper
>> triggering once the watermark for the end of the window arrives.
>>
>> These are good ideas but I'm afraid we still don't have a good solution.
>> This whole processing time/event time business is just very tricky.
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 26 Apr 2016 at 16:26 Maximilian Michels <[hidden email]> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thank you for the detailed design document.
>>>
>>> Wouldn't it be ok to allow these new concepts regardless of the time
>>> semantics? For Event Time and Ingestion Time "Lateness" and
>>> "Accumulating/Discarding" make sense. If the user chooses Processing
>>> time then these can be ignored during translation of the StreamGraph
>>> (possibly with a warning).
>>>
>>> Detecting when these concepts make sense should be possible by
>>> checking the "Stream Charateristics" of the ExecutionEnvironment or
>>> the involved classes (e.g. SlidingProcessingTimeWindows) in the
>>> StreamGraph. If the users uses a custom WindowAssigner then the user
>>> has to take care that it is used correctly. I don't like the
>>> "isEventTime()" method. Even with the additional method, users could
>>> return 'true' there although they meant 'false', right? So this does
>>> not really solve the problem that it is hard to distinguish Event Time
>>> and Processing Time semantics in Flink.
>>>
>>> Another approach that I could think of is getting rid of
>>> 'System.currentTimeMillis()' and only allow to get time via a special
>>> interface that WindowAssigners implement. Then we could determine what
>>> time is assigned and also verify that it is actually used (in contrast
>>> to the isEventTime() method). Would that be an option or would it
>>> break the API?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Tue, Apr 5, 2016 at 12:29 PM, Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> > By the way. The way I see to fixing this is extending WindowAssigner
>>> with
>>> > an "isEventTime()" method and then allow accumulating/lateness in the
>>> > WindowOperator only if this is true.
>>> >
>>> > But it seems a but hacky because it special cases event-time. But then
>>> > again, maybe we need to special case it ...
>>> >
>>> > On Tue, 5 Apr 2016 at 12:23 Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> >
>>> >> Hi Folks,
>>> >> as part of my effort to improve the windowing in Flink [1] I also
>>> thought
>>> >> about lateness, accumulating/discarding and window cleanup. I have
>>> some
>>> >> ideas on this but I would love to get feedback from the community as I
>>> >> think that these things are important for everyone doing event-time
>>> >> windowing on Flink.
>>> >>
>>> >> The basic problem is this: Some elements can arrive behind the
>>> watermark
>>> >> if the watermark is not 100 % correct (which it is not, in most
>>> cases, I
>>> >> would assume). We need to provide API that allows to specify what
>>> happens
>>> >> when these late elements arrive. There are two main knobs for the
>>> user here:
>>> >>
>>> >> - Allowed Lateness: How late can an element be before it is completely
>>> >> ignored, i.e. simply discarded
>>> >>
>>> >> - Accumulating/Discarding Fired Windows: When we fire a window, do we
>>> >> purge the contents or do we keep it around until the watermark passes
>>> the
>>> >> end of end window plus the allowed lateness? If we keep the window a
>>> late
>>> >> element will be added to the window and the window will be emitted
>>> again.
>>> >> If don't keep the window then the late element will essentially
>>> trigger
>>> >> emission of a one-element window.
>>> >>
>>> >> This is somewhat straightforward to implement: If accumulating set a
>>> timer
>>> >> for the end of the window plus the allowed lateness. Cleanup the
>>> window
>>> >> when that fires (basically). All in event-time with watermarks.
>>> >>
>>> >>  My problem is only this: what should happen if the user specifies
>>> some
>>> >> allowed lateness and/or accumulating mode but uses processing-time
>>> >> windowing. For processing-time windows these don't make sense because
>>> >> elements cannot can be late by definition. The problem is, that we
>>> cannot
>>> >> figure out, by looking at a WindowAssigner or the Windows that it
>>> assigns
>>> >> to elements whether these windows are in event-time or processing-time
>>> >> domain. At the API level this is also not easily visible, since a user
>>> >> might have set the "stream-time-characteristic" to event-time but
>>> still use
>>> >> a processing-time window (plus trigger) in the program.
>>> >>
>>> >> Any ideas for solving this are extremely welcome. :-)
>>> >>
>>> >> Cheers,
>>> >> Aljoscha
>>> >>
>>> >> [1]
>>> >>
>>> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit#heading=h.psfzjlv68tp
>>> >>
>>>
>>